Невозможно декодировать пользовательский заголовок безопасности IPv6 во время повторной сборкиPython

Программы на Python
Ответить
Anonymous
 Невозможно декодировать пользовательский заголовок безопасности IPv6 во время повторной сборки

Сообщение Anonymous »

В настоящее время я работаю над защитой фрагментов пакетов IPv6 с помощью специального заголовка безопасности, содержащего криптографическую информацию, такую ​​как MAC и nonce. Вот общий процесс, которому я следую:
Процесс фрагментации:
1. Разбить пакет на фрагменты:
Пакет разбивается на основе MTU сети. Каждый фрагмент включает в себя:
a) Стандартный заголовок IPv6
Стандартный заголовок фрагмента
b) Настраиваемый заголовок безопасности, содержащий:
- MAC: обеспечивает целостность полезной нагрузки.
- Nonce: уникальный идентификатор для каждого пакета.
- Тип безопасности: флаг, указывающий на шифрование или аутентификацию.
2. Прикрепите заголовки и отправьте фрагменты:
Каждый фрагмент включает в себя пользовательский заголовок безопасности (сериализованный в формате JSON) и полезную нагрузку.

```
< strong>Процесс сборки:
1.Фрагменты принимаются в произвольном порядке.
2. Пользовательский заголовок безопасности каждого фрагмента анализируется и проверяется:
- Проверка MAC с использованием HMAC-SHA256.
- Проверка согласованности Nonce, чтобы убедиться, что фрагменты принадлежат одному и тому же пакету.
3. После проверки всех фрагментов , они буферизуются, сортируются и повторно собираются.
```
Проблема:
Во время повторная сборка, Я не могу правильно декодировать пользовательский заголовок безопасности. Логика повторной сборки дает сбой при анализе заголовка из полезных данных фрагмента.
Логика фрагментации:

Код: Выделить всё

from scapy.all import *
import hashlib
import hmac
import json
import os

# Function to generate a MAC for fragment integrity
def generate_mac(data, key):
"""Generate HMAC-SHA256 for integrity."""
return hmac.new(key.encode(), data, hashlib.sha256).hexdigest()

# Function to create a custom IPv6 Security Extension Header (serialized as JSON)
def create_security_header(nonce, mac, encryption_flag):
"""Create a custom security header and serialize it."""
header = {
"Next Header": 44,  # Fragment Header
"Header Length": 4,  # Example fixed size (4 = 32 bytes)
"Security Type": 1,  # 1 = MAC-based integrity
"Encryption Flag": encryption_flag,
"Nonce": nonce,
"MAC": mac
}
return header  # Return the header dictionary

# Function to fragment an IPv6 packet and save the fragments to a PCAP file
def fragment_packet(packet, max_size, filename, key="secretkey"):
"""
Fragment the packet, attach security headers, and save to PCAP.
:param packet: The IPv6 packet to fragment.
:param max_size: The MTU size for fragmentation.
:param filename: The name of the .pcap file to save fragments.
:param key: The key used for MAC generation.
:return: List of fragments.
"""
fragments = []
fragment_size = max_size - 40  # IPv6 header size is 40 bytes
payload = bytes(packet[Raw]) if Raw in packet else b""
total_payload_len = len(payload)
nonce = os.urandom(4).hex()  # Generate a 32-bit random nonce

for i in range(0, total_payload_len, fragment_size):
fragment_payload = payload[i:i + fragment_size]

# Generate MAC for this fragment
mac = generate_mac(fragment_payload, key)

# Create the custom security header and serialize it to JSON
sec_header = create_security_header(nonce, mac, encryption_flag=0)

# Serialize the security header as JSON and ensure proper encoding
security_header_json = json.dumps(sec_header)

# Create the fragment header
frag_header = IPv6(dst=packet[IPv6].dst) / \
IPv6ExtHdrFragment(id=12345, offset=i // 8,
m=1 if i + fragment_size < total_payload_len else 0)

# Embed the serialized JSON security header and the fragment payload together
frag_with_sec = frag_header / Raw(load=security_header_json.encode() + fragment_payload)

fragments.append(frag_with_sec)

# Save all fragments to a PCAP file
wrpcap(filename, fragments)
print(f"Fragments saved to {filename}")

return fragments

# Example usage
if __name__ == "__main__":
# Create a large IPv6 packet
large_packet = IPv6(dst="2001:db8::1") / UDP(sport=1234, dport=80) / Raw(load="A"  * 3000)

# Fragment the packet and save to PCAP
max_mtu = 1280  # IPv6 MTU
fragments = fragment_packet(large_packet, max_mtu, "fragments.pcap")

# Optionally, send the fragments
print("Sending fragments...")
for frag in fragments:
frag.show()
send(frag)
Логика сборки:

Код: Выделить всё

from scapy.all import *
import hashlib
import hmac
import json

# Function to verify the MAC of a fragment
def verify_mac(fragment_payload, key, received_mac):
"""Verify the MAC of a fragment's payload."""
calculated_mac = hmac.new(key.encode(), fragment_payload, hashlib.sha256).hexdigest()
return calculated_mac == received_mac

def parse_security_header(sec_header_raw):
"""Parse the security header, which is expected to be in JSON byte format"""
if not sec_header_raw:
print("Error: Security header is empty or malformed.")
return None
try:
# Inspecting the raw security header data before parsing
print(f"Raw security header data: {sec_header_raw[:256]}")  # Inspect the first 256 bytes
return json.loads(sec_header_raw.decode())  # Convert the byte string back to a dictionary
except json.JSONDecodeError as e:
print(f"Error: Failed to decode security header. {e}")
return None

# Function to reassemble fragments
def reassemble_fragments(fragments, key="secretkey"):
"""Reassemble fragments into the original payload."""
fragments.sort(key=lambda frag: frag[IPv6ExtHdrFragment].offset)  # Sort by offset
payload = b""  # Placeholder for reassembled payload
nonce = None  # Store the nonce for consistency check

for frag in fragments:
# Extract the raw payload and security header (the first part of the fragment)
raw_payload = bytes(frag[Raw])  # Extract the raw payload from the fragment

# Extract the security header (first part of the raw payload)
header_len = len(json.dumps({"Next Header": 44, "Header Length": 4, "Security Type": 1, "Encryption Flag": 0}).encode())
sec_header = parse_security_header(raw_payload[:header_len])  # Adjusted to handle proper header length
fragment_payload = raw_payload[header_len:]  # The actual fragment payload starts after the security header

# Ensure security header is valid
if sec_header is None:
print(f"Invalid security header for fragment with offset {frag[IPv6ExtHdrFragment].offset}")
return None

# Verify MAC
if not verify_mac(fragment_payload, key, sec_header["MAC"]):
print(f"MAC verification failed for fragment with offset {frag[IPv6ExtHdrFragment].offset}")
return None

# Check nonce (ensure all fragments belong to the same packet)
if nonce is None:
nonce = sec_header["Nonce"]  # Set nonce from the first fragment
elif nonce != sec_header["Nonce"]:
print(f"Nonce mismatch for fragment with offset {frag[IPv6ExtHdrFragment].offset}")
return None

# Append fragment payload to the full payload
payload += fragment_payload

return payload

# Example usage
if __name__ == "__main__":
# Simulate receiving fragments
fragments = rdpcap("fragments.pcap")  # Read the fragments from the PCAP file

# Reassemble fragments
print("Reassembling fragments...")
reassembled_payload = reassemble_fragments(fragments, key="secretkey")

if reassembled_payload:
print("Packet reassembled successfully:")
print(reassembled_payload.decode())  # Decode and print the reassembled packet payload
else:
print("Failed to reassemble packet.")
Что я пробовал:
Проверка необработанного заголовка безопасности:
Я использовал print(sec_header_raw[ :256]), чтобы изучить данные. Кажется, имеется дополнительное заполнение или несовпадение.
Расчет длины заголовка:
Я явно установил ожидаемый размер заголовка, чтобы он соответствовал размеру сериализованного JSON.
Декодирование JSON:
Ошибка возникает во время json.loads() при анализе заголовка безопасности. Похоже, что необработанные полезные данные содержат неожиданные байты.
Вопросы

[*]Как я могу убедиться, что что пользовательский заголовок безопасности (в кодировке JSON) правильно извлекается из полезных данных фрагмента?
[*]Может ли проблема быть вызвана неправильным выравниванием или заполнением байтов при встраивании заголовка и полезных данных? >


Подробнее здесь: https://stackoverflow.com/questions/792 ... reassembly
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»