Source code for improc.encoding.huffman

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Date    : 2018-05-31 21:38:26
# @Author  : Zhi Liu (zhiliu.mind@gmail.com)
# @Link    : http://iridescent.ink
# @Version : $1.1$


import heapq
import os
from functools import total_ordering

"""
Note: modifided from bhrigu.me's code
Code for Huffman Coding, compression and decompression.
Explanation at http://bhrigu.me/blog/2017/01/17/huffman-coding-python-implementation/
- padded at the end wiyh '0'
- padded length(1Byte) is at the begining of codes.
"""


[docs]@total_ordering class HeapNode: def __init__(self, char, freq): self.char = char self.freq = freq self.left = None self.right = None # defining comparators less_than and equals def __lt__(self, other): return self.freq < other.freq def __eq__(self, other): if other is None: return False if(not isinstance(other, HeapNode)): return False return self.freq == other.freq
[docs]class HuffmanCoding: def __init__(self, path): self.path = path self.heap = [] self.codes = {} self.reverse_mapping = {} # functions for compression:
[docs] def make_frequency_dict(self, text): frequency = {} for character in text: if character not in frequency: frequency[character] = 0 frequency[character] += 1 return frequency
[docs] def make_heap(self, frequency): for key in frequency: node = HeapNode(key, frequency[key]) heapq.heappush(self.heap, node)
[docs] def merge_nodes(self): while(len(self.heap) > 1): node1 = heapq.heappop(self.heap) node2 = heapq.heappop(self.heap) merged = HeapNode(None, node1.freq + node2.freq) merged.left = node1 merged.right = node2 heapq.heappush(self.heap, merged)
[docs] def make_codes_helper(self, root, current_code): if root is None: return if root.char is not None: self.codes[root.char] = current_code self.reverse_mapping[current_code] = root.char return self.make_codes_helper(root.left, current_code + "1") self.make_codes_helper(root.right, current_code + "0")
[docs] def make_codes(self): root = heapq.heappop(self.heap) current_code = "" self.make_codes_helper(root, current_code)
[docs] def get_encoded_text(self, text): encoded_text = "" for character in text: encoded_text += self.codes[character] return encoded_text
[docs] def pad_encoded_text(self, encoded_text): extra_padding = 8 - len(encoded_text) % 8 for i in range(extra_padding): encoded_text += "0" padded_info = "{0:08b}".format(extra_padding) encoded_text = padded_info + encoded_text return encoded_text, padded_info
[docs] def get_byte_array(self, padded_encoded_text): if(len(padded_encoded_text) % 8 != 0): print("Encoded text not padded properly") exit(0) b = bytearray() for i in range(0, len(padded_encoded_text), 8): byte = padded_encoded_text[i:i + 8] b.append(int(byte, 2)) return b
[docs] def compress(self): filename, file_extension = os.path.splitext(self.path) output_path = filename + ".bin" with open(self.path, 'r+') as file, open(output_path, 'wb') as output: text = file.read() print(text) text = text.rstrip() print(text) frequency = self.make_frequency_dict(text) self.make_heap(frequency) self.merge_nodes() self.make_codes() encoded_text = self.get_encoded_text(text) padded_encoded_text, padded_info = self.pad_encoded_text( encoded_text) b = self.get_byte_array(padded_encoded_text) print(padded_encoded_text) output.write(bytes(b)) print("Compressed") return output_path
""" functions for decompression: """
[docs] def remove_padding(self, padded_encoded_text): padded_info = padded_encoded_text[:8] extra_padding = int(padded_info, 2) padded_encoded_text = padded_encoded_text[8:] encoded_text = padded_encoded_text[:-1 * extra_padding] return encoded_text
[docs] def decode_text(self, encoded_text): current_code = "" decoded_text = "" for bit in encoded_text: current_code += bit if(current_code in self.reverse_mapping): character = self.reverse_mapping[current_code] decoded_text += character current_code = "" return decoded_text
[docs] def decompress(self, input_path): filename, file_extension = os.path.splitext(self.path) output_path = filename + "_decompressed" + ".txt" with open(input_path, 'rb') as file, open(output_path, 'w') as output: bit_string = "" byte = file.read(1) while(len(byte) > 0): byte = ord(byte) bits = bin(byte)[2:].rjust(8, '0') bit_string += bits byte = file.read(1) encoded_text = self.remove_padding(bit_string) decompressed_text = self.decode_text(encoded_text) output.write(decompressed_text) print("Decompressed") return output_path