protocol-reverse-engineering

Par wshobson · agents

Maîtrisez la rétro-ingénierie des protocoles réseau, notamment l'analyse de paquets, la dissection de protocoles et la documentation de protocoles personnalisés. À utiliser pour analyser le trafic réseau, comprendre des protocoles propriétaires ou déboguer des communications réseau.

npx skills add https://github.com/wshobson/agents --skill protocol-reverse-engineering

Ingénierie inverse de protocoles

Techniques complètes pour capturer, analyser et documenter les protocoles réseau pour la recherche en sécurité, l'interopérabilité et le débogage.

Capture de trafic

Capture Wireshark

# Capture sur interface spécifique
wireshark -i eth0 -k

# Capture avec filtre
wireshark -i eth0 -k -f "port 443"

# Capture vers fichier
tshark -i eth0 -w capture.pcap

# Capture en anneau (rotation de fichiers)
tshark -i eth0 -b filesize:100000 -b files:10 -w capture.pcap

Capture tcpdump

# Capture basique
tcpdump -i eth0 -w capture.pcap

# Avec filtre
tcpdump -i eth0 port 8080 -w capture.pcap

# Capture d'octets spécifiques
tcpdump -i eth0 -s 0 -w capture.pcap  # Paquet complet

# Affichage en temps réel
tcpdump -i eth0 -X port 80

Capture Man-in-the-Middle

# mitmproxy pour HTTP/HTTPS
mitmproxy --mode transparent -p 8080

# Interception SSL/TLS
mitmproxy --mode transparent --ssl-insecure

# Dump vers fichier
mitmdump -w traffic.mitm

# Burp Suite
# Configurer le proxy du navigateur sur 127.0.0.1:8080

Analyse de protocoles

Analyse Wireshark

# Filtres d'affichage
tcp.port == 8080
http.request.method == "POST"
ip.addr == 192.168.1.1
tcp.flags.syn == 1 && tcp.flags.ack == 0
frame contains "password"

# Suivi de flux
Clic droit > Follow > TCP Stream
Clic droit > Follow > HTTP Stream

# Export d'objets
File > Export Objects > HTTP

# Déchiffrement
Edit > Preferences > Protocols > TLS
  - (Pre)-Master-Secret log filename
  - RSA keys list

Analyse tshark

# Extraire des champs spécifiques
tshark -r capture.pcap -T fields -e ip.src -e ip.dst -e tcp.port

# Statistiques
tshark -r capture.pcap -q -z conv,tcp
tshark -r capture.pcap -q -z endpoints,ip

# Filtre et extraction
tshark -r capture.pcap -Y "http" -T json > http_traffic.json

# Hiérarchie de protocoles
tshark -r capture.pcap -q -z io,phs

Analyse personnalisée avec Scapy

from scapy.all import *

# Lire pcap
packets = rdpcap("capture.pcap")

# Analyser les paquets
for pkt in packets:
    if pkt.haslayer(TCP):
        print(f"Src: {pkt[IP].src}:{pkt[TCP].sport}")
        print(f"Dst: {pkt[IP].dst}:{pkt[TCP].dport}")
        if pkt.haslayer(Raw):
            print(f"Data: {pkt[Raw].load[:50]}")

# Filtrer les paquets
http_packets = [p for p in packets if p.haslayer(TCP)
                and (p[TCP].sport == 80 or p[TCP].dport == 80)]

# Créer des paquets personnalisés
pkt = IP(dst="target")/TCP(dport=80)/Raw(load="GET / HTTP/1.1\r\n")
send(pkt)

Identification de protocoles

Signatures de protocoles courants

HTTP        - "HTTP/1." ou "GET " ou "POST " au début
TLS/SSL     - 0x16 0x03 (couche d'enregistrement)
DNS         - Port UDP 53, format d'en-tête spécifique
SMB         - 0xFF 0x53 0x4D 0x42 (signature "SMB")
SSH         - Bannière "SSH-2.0"
FTP         - Réponse "220 ", commande "USER "
SMTP        - Bannière "220 ", commande "EHLO"
MySQL       - Préfixe de longueur 0x00, version de protocole
PostgreSQL  - 0x00 0x00 0x00 longueur au démarrage
Redis       - Préfixe de tableau RESP "*"
MongoDB     - Documents BSON avec en-tête spécifique

Motifs d'en-tête de protocole

+--------+--------+--------+--------+
|  Numéro magique / Signature       |
+--------+--------+--------+--------+
|  Version       |  Drapeaux       |
+--------+--------+--------+--------+
|  Longueur      |  Type de message|
+--------+--------+--------+--------+
|  Numéro de séquence / ID de session|
+--------+--------+--------+--------+
|  Charge utile...                  |
+--------+--------+--------+--------+

Analyse de protocoles binaires

Identification de structure

# Motifs courants dans les protocoles binaires

# Message avec préfixe de longueur
struct Message {
    uint32_t length;      # Longueur totale du message
    uint16_t msg_type;    # Identifiant de type de message
    uint8_t  flags;       # Drapeaux/options
    uint8_t  reserved;    # Remplissage/alignement
    uint8_t  payload[];   # Charge utile de longueur variable
};

# Type-Length-Value (TLV)
struct TLV {
    uint8_t  type;        # Type de champ
    uint16_t length;      # Longueur du champ
    uint8_t  value[];     # Données du champ
};

# En-tête fixe + charge utile variable
struct Packet {
    uint8_t  magic[4];    # Signature "ABCD"
    uint32_t version;
    uint32_t payload_len;
    uint32_t checksum;    # CRC32 ou similaire
    uint8_t  payload[];
};

Analyseur de protocoles Python

import struct
from dataclasses import dataclass

@dataclass
class MessageHeader:
    magic: bytes
    version: int
    msg_type: int
    length: int

    @classmethod
    def from_bytes(cls, data: bytes):
        magic, version, msg_type, length = struct.unpack(
            ">4sHHI", data[:12]
        )
        return cls(magic, version, msg_type, length)

def parse_messages(data: bytes):
    offset = 0
    messages = []

    while offset < len(data):
        header = MessageHeader.from_bytes(data[offset:])
        payload = data[offset+12:offset+12+header.length]
        messages.append((header, payload))
        offset += 12 + header.length

    return messages

# Analyser une structure TLV
def parse_tlv(data: bytes):
    fields = []
    offset = 0

    while offset < len(data):
        field_type = data[offset]
        length = struct.unpack(">H", data[offset+1:offset+3])[0]
        value = data[offset+3:offset+3+length]
        fields.append((field_type, value))
        offset += 3 + length

    return fields

Analyse de dump hexadécimal

def hexdump(data: bytes, width: int = 16):
    """Formater les données binaires en dump hexadécimal."""
    lines = []
    for i in range(0, len(data), width):
        chunk = data[i:i+width]
        hex_part = ' '.join(f'{b:02x}' for b in chunk)
        ascii_part = ''.join(
            chr(b) if 32 <= b < 127 else '.'
            for b in chunk
        )
        lines.append(f'{i:08x}  {hex_part:<{width*3}}  {ascii_part}')
    return '\n'.join(lines)

# Exemple de sortie :
# 00000000  48 54 54 50 2f 31 2e 31  20 32 30 30 20 4f 4b 0d  HTTP/1.1 200 OK.
# 00000010  0a 43 6f 6e 74 65 6e 74  2d 54 79 70 65 3a 20 74  .Content-Type: t

Analyse du chiffrement

Identification du chiffrement

# Analyse d'entropie - une haute entropie suggère du chiffrement/compression
import math
from collections import Counter

def entropy(data: bytes) -> float:
    if not data:
        return 0.0
    counter = Counter(data)
    probs = [count / len(data) for count in counter.values()]
    return -sum(p * math.log2(p) for p in probs)

# Seuils d'entropie :
# < 6,0 : Probablement du texte brut ou données structurées
# 6,0-7,5 : Possiblement compressé
# > 7,5 : Probablement chiffré ou aléatoire

# Indicateurs de chiffrement courants
# - Entropie élevée et uniforme
# - Aucune structure ou motif évident
# - Longueur souvent multiple de la taille de bloc (16 pour AES)
# - IV possible au début (16 octets pour AES-CBC)

Analyse TLS

# Extraire les métadonnées TLS
tshark -r capture.pcap -Y "ssl.handshake" \
    -T fields -e ip.src -e ssl.handshake.ciphersuite

# Empreinte JA3 (client)
tshark -r capture.pcap -Y "ssl.handshake.type == 1" \
    -T fields -e ssl.handshake.ja3

# Empreinte JA3S (serveur)
tshark -r capture.pcap -Y "ssl.handshake.type == 2" \
    -T fields -e ssl.handshake.ja3s

# Extraction de certificat
tshark -r capture.pcap -Y "ssl.handshake.certificate" \
    -T fields -e x509sat.printableString

Approches de déchiffrement

# Fichier journal de pre-master secret (navigateur)
export SSLKEYLOGFILE=/tmp/keys.log

# Configurer Wireshark
# Edit > Preferences > Protocols > TLS
# (Pre)-Master-Secret log filename: /tmp/keys.log

# Déchiffrer avec clé privée (si disponible)
# Fonctionne seulement pour l'échange de clé RSA
# Edit > Preferences > Protocols > TLS > RSA keys list

Documentation de protocoles personnalisés

Modèle de spécification de protocole

# Spécification du protocole Name

## Vue d'ensemble

Brève description de l'objectif et de la conception du protocole.

## Transport

- Couche : TCP/UDP
- Port : XXXX
- Chiffrement : TLS 1.2+

## Format de message

### En-tête (12 octets)

| Décalage | Taille | Champ   | Description                |
| -------- | ------ | ------- | -------------------------- |
| 0        | 4      | Magic   | 0x50524F54 ("PROT")        |
| 4        | 2      | Version | Version de protocole (1)   |
| 6        | 2      | Type    | Identifiant de type message|
| 8        | 4      | Length  | Longueur de charge utile   |

### Types de message

| Type | Nom       | Description                |
| ---- | --------- | -------------------------- |
| 0x01 | HELLO     | Initiation de connexion    |
| 0x02 | HELLO_ACK | Connexion acceptée         |
| 0x03 | DATA      | Données d'application      |
| 0x04 | CLOSE     | Terminaison de connexion   |

### Type 0x01 : HELLO

| Décalage | Taille | Champ      | Description                |
| -------- | ------ | ---------- | -------------------------- |
| 0        | 4      | ClientID   | Identifiant unique client  |
| 4        | 2      | Flags      | Drapeaux de connexion      |
| 6        | var    | Extensions | Extensions codées en TLV   |

## Machine d'état

[INIT] --HELLO--> [WAIT_ACK] --HELLO_ACK--> [CONNECTED] | DATA/DATA | [CLOSED] <--CLOSE--+


## Exemples

### Établissement de connexion

Client -> Serveur : HELLO (ClientID=0x12345678) Serveur -> Client : HELLO_ACK (Status=OK) Client -> Serveur : DATA (charge utile)


## Dissecteur Wireshark (Lua)

```lua
-- custom_protocol.lua
local proto = Proto("custom", "Custom Protocol")

-- Définir les champs
local f_magic = ProtoField.string("custom.magic", "Magic")
local f_version = ProtoField.uint16("custom.version", "Version")
local f_type = ProtoField.uint16("custom.type", "Type")
local f_length = ProtoField.uint32("custom.length", "Length")
local f_payload = ProtoField.bytes("custom.payload", "Payload")

proto.fields = { f_magic, f_version, f_type, f_length, f_payload }

-- Noms de types de messages
local msg_types = {
    [0x01] = "HELLO",
    [0x02] = "HELLO_ACK",
    [0x03] = "DATA",
    [0x04] = "CLOSE"
}

function proto.dissector(buffer, pinfo, tree)
    pinfo.cols.protocol = "CUSTOM"

    local subtree = tree:add(proto, buffer())

    -- Analyser l'en-tête
    subtree:add(f_magic, buffer(0, 4))
    subtree:add(f_version, buffer(4, 2))

    local msg_type = buffer(6, 2):uint()
    subtree:add(f_type, buffer(6, 2)):append_text(
        " (" .. (msg_types[msg_type] or "Unknown") .. ")"
    )

    local length = buffer(8, 4):uint()
    subtree:add(f_length, buffer(8, 4))

    if length > 0 then
        subtree:add(f_payload, buffer(12, length))
    end
end

-- Enregistrer pour le port TCP
local tcp_table = DissectorTable.get("tcp.port")
tcp_table:add(8888, proto)

Tests actifs

Fuzzing avec Boofuzz

from boofuzz import *

def main():
    session = Session(
        target=Target(
            connection=TCPSocketConnection("target", 8888)
        )
    )

    # Définir la structure du protocole
    s_initialize("HELLO")
    s_static(b"\x50\x52\x4f\x54")  # Magic
    s_word(1, name="version")       # Version
    s_word(0x01, name="type")       # Type (HELLO)
    s_size("payload", length=4)     # Champ de longueur
    s_block_start("payload")
    s_dword(0x12345678, name="client_id")
    s_word(0, name="flags")
    s_block_end()

    session.connect(s_get("HELLO"))
    session.fuzz()

if __name__ == "__main__":
    main()

Rejeu et modification

from scapy.all import *

# Rejouer le trafic capturé
packets = rdpcap("capture.pcap")
for pkt in packets:
    if pkt.haslayer(TCP) and pkt[TCP].dport == 8888:
        send(pkt)

# Modifier et rejouer
for pkt in packets:
    if pkt.haslayer(Raw):
        # Modifier la charge utile
        original = pkt[Raw].load
        modified = original.replace(b"client", b"CLIENT")
        pkt[Raw].load = modified
        # Recalculer les checksums
        del pkt[IP].chksum
        del pkt[TCP].chksum
        send(pkt)

Meilleures pratiques

Flux de travail d'analyse

  1. Capturer le trafic : Plusieurs sessions, différents scénarios
  2. Identifier les limites : Marqueurs de début/fin de message
  3. Mapper la structure : En-tête fixe, charge utile variable
  4. Identifier les champs : Comparer plusieurs échantillons
  5. Documenter le format : Créer une spécification
  6. Valider la compréhension : Implémenter analyseur/générateur
  7. Tester les cas limites : Fuzzing, conditions limites

Motifs courants à rechercher

  • Nombres magiques/signatures au début du message
  • Champs de version pour la compatibilité
  • Champs de longueur (souvent avant les données variables)
  • Champs de type/opcode pour l'identification de message
  • Numéros de séquence pour l'ordonnancement
  • Checksums/CRC pour l'intégrité
  • Timestamps pour le timing
  • Identifiants de session/connexion

Skills similaires