Реализация Хаффмана сокращает время декомпрессииPython

Программы на Python
Ответить
Anonymous
 Реализация Хаффмана сокращает время декомпрессии

Сообщение Anonymous »

Я пытаюсь улучшить скорость вычислений этого хаффмана. Для небольших входных шестнадцатеричных строк это нормально, но чем больше входная строка, тем значительно увеличивается время при достаточно большой скорости строки (пример ниже) до x50 1 мс против 55 мс+

Код: Выделить всё

import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple, Optional
import numpy as np
from array import array
import ctypes
from line_profiler._line_profiler import byteorder

class Node:
__slots__ = ['char', 'freq', 'left', 'right']

def __init__(self, char: str, freq: int, left=None, right=None):
self.char = char
self.freq = freq
self.left = left
self.right = right

class HybridLookupTable:

"""Hybrid approach combining direct lookup for short codes and binary search for long codes"""

__slots__ = ['short_table', 'long_codes', 'max_short_bits']

def __init__(self, max_short_bits: int = 8):
self.max_short_bits = max_short_bits
self.short_table = [(None, 0)] * (1  None:
code_int = int(code, 2)
code_len = len(code)

if code_len > 3
bit_offset = start_bit & 7
# Pre-fill buffer
for _ in range(8):
if bytes_processed < len(data):
buffer.add_byte(data[bytes_processed])
bytes_processed += 1
# Skip initial bit offset
if bit_offset:
buffer.consume_bits(bit_offset)

while pos <  end_bit and buffer.bits_in_buffer >= 8:
# Try lookup table first (optimized for 8-bit codes)
lookup_bits = buffer.peek_bits(8)
char_info = self.lookup_table.lookup(lookup_bits, 8)

if char_info:
char, code_len = char_info
buffer.consume_bits(code_len)
result.append(char)
pos += code_len
else:
# Fall back to tree traversal
node = self.tree
while node.left and node.right and buffer.bits_in_buffer > 0:
bit = buffer.peek_bits(1)
buffer.consume_bits(1)
node = node.right if bit else node.left
pos += 1
if not (node.left or node.right):
result.append(node.char)

# Refill buffer if needed
while buffer.bits_in_buffer  1:
freq1, _, node1 = nodes.pop()
freq2, _, node2 = nodes.pop()

# Create parent node
parent = Node(node1.char + node2.char, freq1 + freq2, node1, node2)
nodes.append((freq1 + freq2, len(nodes), parent))
nodes.sort(reverse=True)

self.tree = nodes[0][2] if nodes else None
self._build_codes(self.tree)

def _build_codes(self, node: Node, code: str = '') -> None:

"""Build lookup table using depth-first traversal"""

if not node:
return
if not node.left and not node.right:
if code:  # Never store empty codes
self.lookup_table.add_code(code, node.char)
return
if node.left:
self._build_codes(node.left, code + '0')
if node.right:
self._build_codes(node.right, code + '1')

def _parse_header_fast(self, data: memoryview) -> int:

"""Optimized header parsing"""

pos = 12  # Skip first 12 bytes (file_len, always0, chars_count)
chars_count = int.from_bytes(data[8:12], byteorder)

# Pre-allocate dictionary space
self.freqs = {}
self.freqs.clear()

# Process all characters in a single loop
for _ in range(chars_count):
count = int.from_bytes(data[pos:pos + 4], byteorder)
char = chr(data[pos + 4])  # Faster than decode
self.freqs[char] = count
pos += 8
return pos

def _decode_bits_parallel(self, data: memoryview, total_bits: int) -> str:

"""Parallel decoding using multiple threads"""

chunk_bits = (total_bits + self.num_threads - 1) // self.num_threads
chunks = []

# Create chunks ensuring they align with byte boundaries when possible
for i in range(0, total_bits, chunk_bits):
end_bit = min(i + chunk_bits, total_bits)
if i > 0:
# Align to byte boundary when possible
while (i & 7) != 0 and i >  0:
i -= 1
chunks.append((i, end_bit))

# Create decoders for each thread
decoders = [
ChunkDecoder(self.lookup_table, self.tree, self.chunk_size)
for _ in range(len(chunks))
]

# Process chunks in parallel
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
futures = [
executor.submit(decoder.decode_chunk, data, start, end)
for decoder, (start, end) in zip(decoders, chunks)
]

# Collect results
results = []
for future in futures:
chunk_result, _ = future.result()
results.extend(chunk_result)

return ''.join(results)

def _decode_bits_optimized(self, data: memoryview, total_bits: int) -> str:

"""Optimized single-threaded decoding for small inputs"""

if total_bits > self.chunk_size:
return self._decode_bits_parallel(data, total_bits)

result = []
buffer = BitBuffer()
pos = 0
bytes_processed = 0
# Pre-fill buffer
while bytes_processed < min(8, len(data)):
buffer.add_byte(data[bytes_processed])
bytes_processed += 1
while pos < total_bits:
# Use lookup table for common patterns
if buffer.bits_in_buffer >= 8:
lookup_bits = buffer.peek_bits(8)
char_info = self.lookup_table.lookup(lookup_bits, 8)

if char_info:
char, code_len = char_info
buffer.consume_bits(code_len)
result.append(char)
pos += code_len
else:
# Tree traversal for uncommon patterns
node = self.tree
while node.left and node.right and buffer.bits_in_buffer > 0:
bit = buffer.peek_bits(1)
buffer.consume_bits(1)
node = node.right if bit else node.left
pos += 1
if not (node.left or node.right):
result.append(node.char)

# Refill buffer
while buffer.bits_in_buffer  str:
# Use numpy for faster hex decoding
clean_hex = hex_string.replace(' ', '')
data = np.frombuffer(bytes.fromhex(clean_hex), dtype=np.uint8)
return self.decode_bytes(data.tobytes())

def decode_bytes(self, data: bytes) -> str:
view = memoryview(data)
pos = self._parse_header_fast(view)

self._build_efficient_tree()

# Get packed data info using numpy for faster parsing
header = np.frombuffer(data[pos:pos + 12], dtype=np.uint32)
packed_bits = int(header[0])
packed_bytes = int(header[1])
pos += 12
# Choose decoding method based on size
if packed_bits > self.chunk_size:
return self._decode_bits_parallel(view[pos:pos + packed_bytes], packed_bits)
else:
return self._decode_bits_optimized(view[pos:pos + packed_bytes], packed_bits)

def encode(self, text: str) -> bytes:

"""Encode text using Huffman coding - for testing purposes"""

# Count frequencies
self.freqs = {}
for char in text:
self.freqs[char] = self.freqs.get(char, 0) + 1
# Build tree and codes
self._build_efficient_tree()

# Convert text to bits
bits = []
for char in text:
code = self.lookup_table.get_code(char)
bits.extend(code)

# Pack bits into bytes
packed_bytes = []
for i in range(0, len(bits), 8):
byte = 0
for j in range(min(8, len(bits) - i)):
if bits[i + j]:
byte |= 1 

Подробнее здесь: [url]https://stackoverflow.com/questions/79174751/huffman-implementation-reduce-decompression-time[/url]
Ответить

Быстрый ответ

Изменение регистра текста: 
Смайлики
:) :( :oops: :roll: :wink: :muza: :clever: :sorry: :angel: :read: *x)
Ещё смайлики…
   
К этому ответу прикреплено по крайней мере одно вложение.

Если вы не хотите добавлять вложения, оставьте поля пустыми.

Максимально разрешённый размер вложения: 15 МБ.

Вернуться в «Python»