Я пытаюсь улучшить скорость вычислений этого хаффмана. Для небольших входных шестнадцатеричных строк это нормально, но чем больше входная строка, тем значительно увеличивается время при достаточно большой скорости строки (пример ниже) до x50 1 мс против 55 мс+
Я пытаюсь улучшить скорость вычислений этого хаффмана. Для небольших входных шестнадцатеричных строк это нормально, но чем больше входная строка, тем значительно увеличивается время при достаточно большой скорости строки (пример ниже) до x50 1 мс против 55 мс+ [code]import time from concurrent.futures import ThreadPoolExecutor from typing import List, Tuple, Optional import numpy as np from array import array import ctypes from line_profiler._line_profiler import byteorder
class Node: __slots__ = ['char', 'freq', 'left', 'right']
def __init__(self, char: str, freq: int, left=None, right=None): self.char = char self.freq = freq self.left = left self.right = right
class HybridLookupTable:
"""Hybrid approach combining direct lookup for short codes and binary search for long codes"""
if code_len > 3 bit_offset = start_bit & 7 # Pre-fill buffer for _ in range(8): if bytes_processed < len(data): buffer.add_byte(data[bytes_processed]) bytes_processed += 1 # Skip initial bit offset if bit_offset: buffer.consume_bits(bit_offset)
while pos < end_bit and buffer.bits_in_buffer >= 8: # Try lookup table first (optimized for 8-bit codes) lookup_bits = buffer.peek_bits(8) char_info = self.lookup_table.lookup(lookup_bits, 8)
if char_info: char, code_len = char_info buffer.consume_bits(code_len) result.append(char) pos += code_len else: # Fall back to tree traversal node = self.tree while node.left and node.right and buffer.bits_in_buffer > 0: bit = buffer.peek_bits(1) buffer.consume_bits(1) node = node.right if bit else node.left pos += 1 if not (node.left or node.right): result.append(node.char)
# Refill buffer if needed while buffer.bits_in_buffer 1: freq1, _, node1 = nodes.pop() freq2, _, node2 = nodes.pop()
"""Build lookup table using depth-first traversal"""
if not node: return if not node.left and not node.right: if code: # Never store empty codes self.lookup_table.add_code(code, node.char) return if node.left: self._build_codes(node.left, code + '0') if node.right: self._build_codes(node.right, code + '1')
# Pre-allocate dictionary space self.freqs = {} self.freqs.clear()
# Process all characters in a single loop for _ in range(chars_count): count = int.from_bytes(data[pos:pos + 4], byteorder) char = chr(data[pos + 4]) # Faster than decode self.freqs[char] = count pos += 8 return pos
# Create chunks ensuring they align with byte boundaries when possible for i in range(0, total_bits, chunk_bits): end_bit = min(i + chunk_bits, total_bits) if i > 0: # Align to byte boundary when possible while (i & 7) != 0 and i > 0: i -= 1 chunks.append((i, end_bit))
# Create decoders for each thread decoders = [ ChunkDecoder(self.lookup_table, self.tree, self.chunk_size) for _ in range(len(chunks)) ]
# Process chunks in parallel with ThreadPoolExecutor(max_workers=self.num_threads) as executor: futures = [ executor.submit(decoder.decode_chunk, data, start, end) for decoder, (start, end) in zip(decoders, chunks) ]
# Collect results results = [] for future in futures: chunk_result, _ = future.result() results.extend(chunk_result)
"""Optimized single-threaded decoding for small inputs"""
if total_bits > self.chunk_size: return self._decode_bits_parallel(data, total_bits)
result = [] buffer = BitBuffer() pos = 0 bytes_processed = 0 # Pre-fill buffer while bytes_processed < min(8, len(data)): buffer.add_byte(data[bytes_processed]) bytes_processed += 1 while pos < total_bits: # Use lookup table for common patterns if buffer.bits_in_buffer >= 8: lookup_bits = buffer.peek_bits(8) char_info = self.lookup_table.lookup(lookup_bits, 8)
if char_info: char, code_len = char_info buffer.consume_bits(code_len) result.append(char) pos += code_len else: # Tree traversal for uncommon patterns node = self.tree while node.left and node.right and buffer.bits_in_buffer > 0: bit = buffer.peek_bits(1) buffer.consume_bits(1) node = node.right if bit else node.left pos += 1 if not (node.left or node.right): result.append(node.char)
# Refill buffer while buffer.bits_in_buffer str: # Use numpy for faster hex decoding clean_hex = hex_string.replace(' ', '') data = np.frombuffer(bytes.fromhex(clean_hex), dtype=np.uint8) return self.decode_bytes(data.tobytes())