Я работаю над проектом, который встраивает зашифрованный текст в файл PNG с использованием LSB. Моя функция lsb_encode работает, IV правильно сохраняется в метаданных (я проверял), и в целом файл пригоден для использования.
Моя основная проблема заключается в том, что я пытаюсь повернуть вспять процесс. Я получаю ошибку в заголовке, используя эту команду Curl -> Curl -X POST -F "encoded_image=@C:/Users/PATH/output_image.png" -F "delimiter=example delimiter" ``http://127.0. 0.1:5000/png_decode.
Я не разработчик, и это один из моих первых «крупных» проектов. Я работал над этим.
import logging
import numpy as np
from PIL import Image, PngImagePlugin, JpegImagePlugin
import hashlib
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives import hashes, padding
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import io
from io import BytesIO
import os
from flask import Flask, url_for, jsonify, send_file, request
# Code is test only right now. IF it works as intended it should be tuned to adapt to a web service.
app = Flask(__name__)
logging.basicConfig(level=logging.DEBUG)
master_key = 'swlLZ3oalZ5w_sk9ZCgUgmkH7PZLJQqt76D51a5kdAM='
#@app.route('/png_encode', methods=['POST'])
def lsb_encoder(file, message, delimiter, iv):
# image load and convert to array
try:
with Image.open(file) as img:
if img.mode != 'RGB':
img = img.convert('RGB')
encoded_image = img.copy()
width, height = img.size
logging.debug("Image size: %dx%d", width, height)
#Prepare binary message including delimiter
binary_message = ''.join(format(byte, '08b') for byte in message)
binary_delimiter = ''.join(format(byte, '08b') for byte in delimiter)
binary_message += binary_delimiter
logging.debug("Binary message length in bits: %d", len(binary_message))
# Check if message fits in image
if len(binary_message) > width * height * 3:
return None, jsonify({"error": "Message cannot be fully hidden in image"})
pixels = encoded_image.load()
message_index = 0
# Encode message into image
for x in range(width):
for y in range(height):
if message_index >= len(binary_message):
break
r, g, b = pixels[x, y]
for i in range(3):
if message_index >= len(binary_message):
break
bit = int(binary_message[message_index])
if i == 0:
r = (r & ~1) | bit
elif i == 1:
g = (g & ~1) | bit
else:
b = (b & ~1) | bit
message_index += 1
pixels[x, y] = (r, g, b)
output_buffer = io.BytesIO()
png_info = PngImagePlugin.PngInfo()
png_info.add_text("iv", iv.hex())
encoded_image.save(output_buffer, pnginfo=png_info, format="PNG", optimize=False) # compress_level=0 If facing image size related errors, set optimize to False
output_buffer.seek(0)
logging.debug("Encoded image size after saving %d KB", output_buffer.tell()//1024)
return output_buffer, None
except Exception as e:
logging.error("Error during encoding: %s", e)
return None, jsonify({"error": "Failed to process and encode image"})
#########################################################################################################################################################################
#@app.route('/png_decode', methods=['POST'])
def lsb_decoder(file):
try:
file.seek(0)
with Image.open(file) as img:
logging.debug("Image opened successfully in lsb_decoder.")
if img.mode != 'RGB':
img = img.convert('RGB')
width, height = img.size
logging.debug("Encoded image size: %dx%d", width, height)
pixels = img.load()
binary_message = ''
# Extract LSB bits to form binary string
for x in range(width):
for y in range(height):
r, g, b = pixels[x, y]
binary_message += str(r & 1)
binary_message += str(g & 1)
binary_message += str(b & 1)
# Convert binary string to bytes
binary_message = [binary_message[i:i+8] for i in range(0, len(binary_message), 8)]
message_bytes = bytearray()
for byte in binary_message:
if len(byte) < 8:
continue # Skip if not a full byte
message_bytes.append(int(byte, 2))
# Return the extracted bytes (ciphertext + delimiter), without IV
return bytes(message_bytes), None
except Exception as e:
logging.error("Error during extraction: %s", e)
return None, str(e)
# return jsonify({"message": decoded_message}), 200
@app.route('/png_encode', methods=['POST'])
def data_preprocessing():
file = request.files.get('image') #get image from upload
message = request.form.get('message') #get message to be stegoed into image
delimiter = request.form.get('delimiter') #get delimiter to be hashed for derived key and stegoed in the image
if not file or not message or not delimiter:
return jsonify({"error": "Image, message and delimiter are required"}), 400
hashed_delimiter = hashlib.sha256(delimiter.encode('utf8')).digest() #hash delimiter
iv = os.urandom(16)
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, #32 bytes = 256 bit key for AES-256
salt = hashed_delimiter,
iterations=10000,
backend=default_backend()
)#derive key for encryption of message
derived_key = kdf.derive(master_key.encode('utf8'))
message_bytes = message.encode('utf8')
#AES cipher in CBC mode
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend())
#Padding to make it multiple of 16 bytes
padder = padding.PKCS7(algorithms.AES.block_size).padder()
padded_message = padder.update(message_bytes) + padder.finalize()
#Encrypt the padded message
encryptor = cipher.encryptor()
ciphertext = encryptor.update(padded_message) + encryptor.finalize()
encoded_image, error_response = lsb_encoder(file, ciphertext, hashed_delimiter, iv) #error_response
if error_response:
return error_response
return send_file(encoded_image, mimetype="image/png", as_attachment=True, download_name="encoded_image.png")
@app.route('/png_decode', methods=['POST'])
def data_deciphering():
encoded_image = request.files.get('encoded_image') # Get the encoded image from upload
delimiter = request.form.get('delimiter') # Get the delimiter used for hashing
if not encoded_image or not delimiter:
return jsonify({"error": "Image and delimiter are required"}), 400
hashed_delimiter = hashlib.sha256(delimiter.encode('utf8')).digest() # Hash the delimiter
# Decode the image to extract the ciphertext and delimiter (IV not included here)
ciphertext_with_delimiter, error_response = lsb_decoder(encoded_image)
if error_response:
return jsonify({"error": error_response}), 400
# Extract IV from image metadata in data_deciphering
try:
with Image.open(encoded_image) as img:
iv_hex = img.info.get("iv", None)
if iv_hex is None:
return jsonify({"error": "IV not found in image metadata"}), 400
iv = bytes.fromhex(iv_hex)
except Exception as e:
logging.error("Error reading IV from image metadata: %s", e)
return jsonify({"error": "Failed to extract IV from image"}), 400
# Derive key for decryption
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32, # 32 bytes = 256 bit key for AES-256
salt=hashed_delimiter,
iterations=10000,
backend=default_backend()
)
derived_key = kdf.derive(master_key.encode('utf8'))
# AES cipher in CBC mode for decryption
cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend())
decryptor = cipher.decryptor()
# Decrypt the ciphertext
padded_message = decryptor.update(ciphertext_with_delimiter) + decryptor.finalize()
# Unpadding to get the original message
unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder()
original_message = unpadder.update(padded_message) + unpadder.finalize()
message_length = len(original_message) - 32
original_message = original_message[:message_length].decode('utf8')
return jsonify({"message": original_message}), 200
#########################################################################################################################################################################
if __name__ == '__main__':
app.run(debug=True)
Добавлено несколько операторов журналирования, чтобы попытаться отследить сбой процесса, но безуспешно.
Я работаю над проектом, который встраивает зашифрованный текст в файл PNG с использованием LSB. Моя функция lsb_encode работает, IV правильно сохраняется в метаданных (я проверял), и в целом файл пригоден для использования. Моя основная проблема заключается в том, что я пытаюсь повернуть вспять процесс. Я получаю ошибку в заголовке, используя эту команду Curl -> Curl -X POST -F "encoded_image=@C:/Users/PATH/output_image.png" -F "delimiter=example delimiter" ``http://127.0. 0.1:5000/png_decode. Я не разработчик, и это один из моих первых «крупных» проектов. Я работал над этим. [code]import logging import numpy as np from PIL import Image, PngImagePlugin, JpegImagePlugin import hashlib from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from cryptography.hazmat.primitives import hashes, padding from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes import io from io import BytesIO import os from flask import Flask, url_for, jsonify, send_file, request # Code is test only right now. IF it works as intended it should be tuned to adapt to a web service.
#@app.route('/png_encode', methods=['POST']) def lsb_encoder(file, message, delimiter, iv): # image load and convert to array try: with Image.open(file) as img: if img.mode != 'RGB': img = img.convert('RGB') encoded_image = img.copy() width, height = img.size logging.debug("Image size: %dx%d", width, height)
#Prepare binary message including delimiter binary_message = ''.join(format(byte, '08b') for byte in message) binary_delimiter = ''.join(format(byte, '08b') for byte in delimiter) binary_message += binary_delimiter logging.debug("Binary message length in bits: %d", len(binary_message))
# Check if message fits in image if len(binary_message) > width * height * 3: return None, jsonify({"error": "Message cannot be fully hidden in image"})
pixels = encoded_image.load() message_index = 0
# Encode message into image for x in range(width): for y in range(height): if message_index >= len(binary_message): break
r, g, b = pixels[x, y]
for i in range(3): if message_index >= len(binary_message): break
bit = int(binary_message[message_index]) if i == 0: r = (r & ~1) | bit elif i == 1: g = (g & ~1) | bit else: b = (b & ~1) | bit
message_index += 1
pixels[x, y] = (r, g, b)
output_buffer = io.BytesIO() png_info = PngImagePlugin.PngInfo() png_info.add_text("iv", iv.hex()) encoded_image.save(output_buffer, pnginfo=png_info, format="PNG", optimize=False) # compress_level=0 If facing image size related errors, set optimize to False output_buffer.seek(0) logging.debug("Encoded image size after saving %d KB", output_buffer.tell()//1024) return output_buffer, None
except Exception as e: logging.error("Error during encoding: %s", e) return None, jsonify({"error": "Failed to process and encode image"})
#@app.route('/png_decode', methods=['POST']) def lsb_decoder(file): try: file.seek(0) with Image.open(file) as img: logging.debug("Image opened successfully in lsb_decoder.") if img.mode != 'RGB': img = img.convert('RGB') width, height = img.size logging.debug("Encoded image size: %dx%d", width, height)
pixels = img.load() binary_message = ''
# Extract LSB bits to form binary string for x in range(width): for y in range(height): r, g, b = pixels[x, y] binary_message += str(r & 1) binary_message += str(g & 1) binary_message += str(b & 1)
# Convert binary string to bytes binary_message = [binary_message[i:i+8] for i in range(0, len(binary_message), 8)] message_bytes = bytearray()
for byte in binary_message: if len(byte) < 8: continue # Skip if not a full byte message_bytes.append(int(byte, 2)) # Return the extracted bytes (ciphertext + delimiter), without IV return bytes(message_bytes), None
except Exception as e: logging.error("Error during extraction: %s", e) return None, str(e)
@app.route('/png_encode', methods=['POST']) def data_preprocessing(): file = request.files.get('image') #get image from upload message = request.form.get('message') #get message to be stegoed into image delimiter = request.form.get('delimiter') #get delimiter to be hashed for derived key and stegoed in the image
if not file or not message or not delimiter: return jsonify({"error": "Image, message and delimiter are required"}), 400
hashed_delimiter = hashlib.sha256(delimiter.encode('utf8')).digest() #hash delimiter iv = os.urandom(16)
kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, #32 bytes = 256 bit key for AES-256 salt = hashed_delimiter, iterations=10000, backend=default_backend() )#derive key for encryption of message derived_key = kdf.derive(master_key.encode('utf8'))
message_bytes = message.encode('utf8') #AES cipher in CBC mode cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend()) #Padding to make it multiple of 16 bytes padder = padding.PKCS7(algorithms.AES.block_size).padder() padded_message = padder.update(message_bytes) + padder.finalize() #Encrypt the padded message encryptor = cipher.encryptor() ciphertext = encryptor.update(padded_message) + encryptor.finalize()
@app.route('/png_decode', methods=['POST']) def data_deciphering(): encoded_image = request.files.get('encoded_image') # Get the encoded image from upload delimiter = request.form.get('delimiter') # Get the delimiter used for hashing
if not encoded_image or not delimiter: return jsonify({"error": "Image and delimiter are required"}), 400
hashed_delimiter = hashlib.sha256(delimiter.encode('utf8')).digest() # Hash the delimiter
# Decode the image to extract the ciphertext and delimiter (IV not included here) ciphertext_with_delimiter, error_response = lsb_decoder(encoded_image) if error_response: return jsonify({"error": error_response}), 400
# Extract IV from image metadata in data_deciphering try: with Image.open(encoded_image) as img: iv_hex = img.info.get("iv", None) if iv_hex is None: return jsonify({"error": "IV not found in image metadata"}), 400 iv = bytes.fromhex(iv_hex) except Exception as e: logging.error("Error reading IV from image metadata: %s", e) return jsonify({"error": "Failed to extract IV from image"}), 400
# Derive key for decryption kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, # 32 bytes = 256 bit key for AES-256 salt=hashed_delimiter, iterations=10000, backend=default_backend() ) derived_key = kdf.derive(master_key.encode('utf8'))
# AES cipher in CBC mode for decryption cipher = Cipher(algorithms.AES(derived_key), modes.CBC(iv), backend=default_backend()) decryptor = cipher.decryptor()
# Decrypt the ciphertext padded_message = decryptor.update(ciphertext_with_delimiter) + decryptor.finalize()
# Unpadding to get the original message unpadder = padding.PKCS7(algorithms.AES.block_size).unpadder() original_message = unpadder.update(padded_message) + unpadder.finalize() message_length = len(original_message) - 32 original_message = original_message[:message_length].decode('utf8') return jsonify({"message": original_message}), 200
if __name__ == '__main__': app.run(debug=True) [/code] Добавлено несколько операторов журналирования, чтобы попытаться отследить сбой процесса, но безуспешно.