2019.05.03

## Test 1

• Input: 路径：各字母出现频率（含 Space），保存于 CSV 文件中
• Output: 字母的信息熵
文件目录:
information_theory_experiment_test1
|--- entropy_cal.py
|--- letter_frequence.csv
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test 1"""

import pandas as pd
import math

class Solution(object):
def calInfoEntropy(self, path):
"""
:type path: str
"""
result = 0

for i in range(0, data.shape[0] - 1):
num_tem = data.iat[i, 1]
result += num_tem * math.log2(1 / num_tem)

return result


Pandas 自带读取 CSV 文件功能，读取结果为矩阵，CSV 文件中第一行会被识别为列名。取数函数有多种：

• loc，基于列 label，可选取特定行（根据行 index）
• iloc，基于行/列的 position
• at，根据指定行 index 及列 label，快速定位 DataFrame 的元素
• iat，与 at 类似，不同的是根据 position 来定位的
• ix，为 loc 与 iloc 的混合体，既支持 label 也支持 position

## Test 2

• Input: 路径：各字母出现频率（含 Space），信道矩阵，分别保存于 CSV 文件中
• Output: 字母的噪音熵和互信息
文件目录:
information_theory_experiment_test2
|--- entropy_cal.py
|--- letter_frequence.csv
|--- communication_channel.csv
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test 2"""

import pandas as pd
import numpy as np
import math

conditional_entropy = 0
mutual_information = 0

# input data
joint_probability_distribution = np.zeros(shape=(27, 27))

# cal the conditional entropy
for i in range(0, 26):
for j in range(0, 26):
if comm_channel.iat[i, j] != 0:
joint_probability_distribution[i][j] = letter_fre_x.iat[j, 1] * comm_channel.iat[i, j]
conditional_entropy -= \
joint_probability_distribution[i][j] * math.log2(comm_channel.iat[i, j])
else:
continue

letter_fre_y = np.sum(joint_probability_distribution, axis=0)

# cal the mutual information
for i in range(0, 26):
for j in range(0, 26):
if comm_channel.iat[i, j] != 0:
mutual_information += \
joint_probability_distribution[i][j] * \
math.log2(joint_probability_distribution[i][j] / (letter_fre_x.iat[j, 1] * letter_fre_y[j]))
else:
continue

print('The conditional entropy is: ', conditional_entropy, ' bit')
print('The mutual information is: ', mutual_information, 'bit')


## Test 3

The fundamental problem of communication is that of reproducing at one point either exactly or approximately a message selected at another point.

• Input: 各字母出现频率（含 Space），欲 Huffman 编码的句子
• Output: Huffman 编码结果（使用结果编码句子在这里暂时省略，Test 5 会提到）
文件目录:
information_theory_experiment_test3
|--- huffman_coding.py
|--- letter_frequence.csv
|--- sentence.docx
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test 3"""

import queue
import pandas as pd

class HuffmanNode(object):
def __init__(self, left=None, right=None, root=None):
self.left = left
self.right = right
def children(self):
return((self.left, self.right))

def create_tree(frequencies):
p = queue.PriorityQueue()
for value in frequencies:
p.put(value)
while p.qsize() > 1:
l, r = p.get(), p.get()
node = HuffmanNode(l, r)
p.put((l[0]+r[0], node))
return p.get()

node = create_tree(letter_fre)

def walk_tree(node, prefix="", code={}):
"""
node 是一个tuple(freq, HuffmanNode|character)
"""
if isinstance(node[1], HuffmanNode):
code1 = walk_tree(node[1].left, '0', code.copy())
code2 = walk_tree(node[1].right, '1', code.copy())
if len(code1) > 0:
for k, v in code1.items():
code[k] = prefix + v
if len(code2) > 0:
for k, v in code2.items():
code[k] = prefix + v
else:
code[node[1]] = prefix
return(code)

code = walk_tree(node)

for i in sorted(freq, reverse=True):
try:
print(i[1], '{:6.2f}'.format(i[0]), code[i[1]])
except Exception as e:
print(e)
continue


## Test 4

P(Y=A|X=B)=P(Y=B|X=B)=P(Y=C|X=B)=1/3

• Input: 各字母出现频率（含 Space），信道转移矩阵
• Output: 信道容量
文件目录:
information_theory_experiment_test4
|--- channel_capacity_cal.py
|--- letter_frequence.csv
|--- communication_channel.csv


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test 4 - Error Example"""

import pandas as pd
import numpy as np
import math

def channel_capacity_cal(p, q, f, m):
"""the function to cal the channel capacity"""
tem_precision_x = p
channel_cap1 = 0
channel_cap2 = 0

for i in range(1, m):
sum = 0
row = q.shape[0]
column = q.shape[1]
rev_transfer_matrix = np.zeros(row, column)
out_precision = np.zeros(1, column)
tem_precision_y = np.zeros(1, row)

for j in range(1, column):
tem_sum = 0
for k in range(1, row):
tem_sum += q(k, j) * tem_precision_x(k, 1)
out_precision.at[1, i] = tem_sum
for j in range(1, row):
for k in range(1, column):
rev_transfer_matrix.at[j, k] = q.iat[j, k] * tem_precision_x.iat[j, 1] / out_precision.at[1, k]
for j in range(1, row):
tem_sum = 0
for k in range(1, column):
if rev_transfer_matrix.at[j, k] != 0:
tem_sum += q.iat[j, k] * math.log2(rev_transfer_matrix.at[j, k])
tem_precision_y.at[0, j] = math.pow(2, tem_sum)
sum += tem_precision_y.at[0, j]
for j in range(1, row):
tem_precision_y.at[0, j] /= sum
channel_cap2 = math.log2(sum)

print(channel_cap2)
if abs(channel_cap1 - channel_cap2) < f:
channel_cap1 = channel_cap2
tem_precision_x = tem_precision_y
print("the iterations times is: " + i)
break
else:
channel_cap1 = channel_cap2
tem_precision_x = tem_precision_y

print(tem_precision_x)
return channel_cap1

max_iterations = 100
precision = 0.001
channel_capacity_cal(letter_fre, channel_transition_probability, precision, max_iterations)


import numpy as np

precision = 0.001

def initiate_prob_distrib(p):
p_x = np.transpose(np.ones((1, p.shape[0])) / p.shape[0])
print("初始化概率分布为:\n", p_x)

return p_x

p_x = initiate_prob_distrib(channel_transition_probability)

def iteration(p_i, p_ij, k):
q_j = np.sum(p_i * p_ij, axis=0)
print("NO. " + str(k) + " iteration: q_j:\n", q_j)

alpha_i = np.exp(np.sum(p_ij * np.log(p_ij / q_j), axis=1))
alpha_i = np.expand_dims(alpha_i, axis=0)
print("NO. " + str(k) + " iteration: alpha_i:\n", alpha_i)

u = np.matmul(alpha_i, p_i)[0]
print("NO. " + str(k) + " iteration: u:\n", u)

I_L = np.log2(u)[0]
print("NO. " + str(k) + " iteration: I_L:\n", I_L)

I_U = np.log2(np.amax(alpha_i))
print("NO. " + str(k) + " iteration: I_U:\n", I_U)

if I_U - I_L < precision:
print("capacity:\n ", I_L)
print("input letter frequency:\n ", p_i)
return True, I_L, p_i
else:
p_i = p_i * np.transpose(alpha_i) / u[0]
print("No." + str(k) + "Update Probability distributions:\n", p_i)
return False, p_i

flag, I_L, p_i = iteration(p_x, channel_transition_probability, 1)

ans_dict = dict()
for p in (channel_transition_probability1, channel_transition_probability2, channel_transition_probability3, channel_transition_probability4):
flag = False
p_x = initiate_prob_distrib(p)
k = 1
while not flag:
flag, ans, tmp_p = iteration(p_x, p, k)
if not flag:
p_x = tmp_p
k = k + 1
else:
ans_dict[ans] = tmp_p

for k, v in ans_dict.items() :
print("capacity：", k)
print("input letter frequency：\n", np.transpose(v)[0])


## Test 5

1. 基于 Test 3 的程序，修改为构造 0, 1 二元 Huffman 编码程序

2. 使用上述二元 Huffman 编码程序为下一段文字进行编码

The fundamental problem of communication is that of reproducing at one point either exactly or approximately a message selected at another point.

3. 使用给定的生成矩阵，对上述编码结果进行信道编码（Hamming 编码）

4. 任意修改一定量上述编码结果，之后对此修改之后的编码使用校验矩阵，进行信道译码（Hamming 译码）

Test 5 在整合前面的实验之后加入信道编码部分。

文件目录:
information_theory_experiment_test5
|--- run.py - 主程序
|--- huffman_coding.py - 实现 Huffman 编码
|--- hamming_coding.py - 实现 Hamming 编码
|--- hamming_decoding.py - 实现 Hamming 译码
|--- article.docx - 编码文字文档
|--- hamming_coding.csv - 生成矩阵
|--- hamming_decoding.csv - 校验矩阵
|--- letter_frequecny.csv - 字母频率
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""Test_5/run.py"""

import huffman_coding
import hamming_coding
import hamming_decoding

huffman_coding_result = huffman_coding.huffman_coding()
hamming_coding_result = hamming_coding.hamming_coding(huffman_coding_result)
hamming_decoding.hamming_decoding(huffman_coding_result, hamming_coding_result)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>

"""Test_5/huffman_coding.py"""

import pandas as pd
import docx

def huffman_coding(letter_fre):
file = docx.opendocx("article.docx")
doc = docx.getdocumenttext(file)
huffman_coding_result = ""
length = 0

huffman = ()
for i in range(0, 26):
tem = ('',)
huffman += tem

for loop in range(1, 12):
flag = 26 - loop * 2
letter_fre.sort_values(by='probability')
for i in range(97, 122):
if letter_fre.iat[flag, 0] == chr(i):
huffman[i - 96] += '0'
if letter_fre.iat[flag + 1, 0] == chr(i):
huffman[i - 96] += '1'
if letter_fre.iat[flag + 2, 0] == chr(i):
huffman[i - 96] += '2'
if letter_fre.iat[flag, 0] == ' ':
huffman[i - 96] += '0'
if letter_fre.iat[flag + 1, 0] == ' ':
huffman[i - 96] += '1'
if letter_fre.iat[flag + 2, 0] == ' ':
huffman[i - 96] += '2'
sum_tem = 0
for i in range(flag, flag + 2):
sum_tem += float(letter_fre.iat[i, 1])
letter_fre.iat[i, 1] = str(-1)
letter_fre.iat[flag, 1] = str(sum_tem)

print('The huffman code of such letters are as follow: ')
for i in range(0, 26):
length += huffman[i].length()
print(huffman[i])
print('The average length of huffman code is: ')
print(length / 27)

print('The sentence was translated to huffman code as follow: ')
for i in range(0, len(doc) - 1):
flag = 0
for j in range(97, 122):
if doc[i] == chr(j):
print(huffman[j - 96])
flag = 1
if flag == 0:
print(huffman[0])

for i in range(0, len(doc)):
tem = ord(doc[i])
if tem == 32:
huffman_coding_result += huffman[0]
elif tem >= 65 & tem <= 90:
huffman_coding_result += huffman[tem - 64]
elif tem >= 97 & tem <= 122:
huffman_coding_result += huffman[tem - 96]

print("The result of huffman coding: " + huffman_coding_result)
return huffman_coding_result
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>

"""Test_5/hamming_coding.py"""

import pandas as pd
import numpy as np
import docx

def hamming_coding(huffman_coding_result):
code_length = len(huffman_coding_result)
remainder = code_length % 4
b = [0, 0, 0, 0]
hamming_coding_result = ""
s = np.zeros(int((code_length - remainder) / 4 + 1), 7)

if remainder != 0:
for i in range(0, 3 - remainder):
huffman_coding_result += '0'
for i in range(0, int((code_length - remainder) / 4) - 1):
b[0] = ord(huffman_coding_result[4 * i]) - 48
b[1] = ord(huffman_coding_result[4 * i + 1]) - 48
b[2] = ord(huffman_coding_result[4 * i + 2]) - 48
b[3] = ord(huffman_coding_result[4 * i + 3]) - 48
c = np.dot(b, hamming_coding_list)
for j in range(0, 6):
if c[j] == 0 | c[j] == 2:
s[i, j] = 0
if c[j] == 1 | c[j] == 3:
s[i, j] = 1
s[51, 2] -= 1
s[34, 2] -= 1

for i in range(0, int((code_length - remainder) / 4) - 1):
for j in range(0, 3):
hamming_coding_result += chr(s[i, j])
for i in range(0, remainder - 1):
hamming_coding_result += chr(s[int((code_length - remainder) / 4), i])

print(hamming_coding_result)
return s
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Copyright (c) 2019 - magic_kurakun <magickurakun@gmail.com>

"""Test_5/hamming_decoding.py"""

import pandas as pd
import numpy as np
import docx

def hamming_decoding(huffman_coding_result, s):
code_length = len(huffman_coding_result)
remainder = code_length % 4
d = np.zeros(int((code_length - remainder) / 4), 3)
hamming_decoding_result = ""
hamming_decoding_result_tem = ""

for i in range(0, int((code_length - remainder) / 4) - 1):
c = np.dot(s[i, :], hamming_decoding_list)
for j in range(0, 2):
if c[j] == 0 | c[j] == 2:
d[i, j] = 0
elif c[j] == 1 | c[j] == 3:
d[i, j] = 1
for j in range(0, 3):
if d[i, 0] == hamming_decoding_list[j, 0] & d[i, 1] == hamming_decoding_list[j, 2] &\
d[i, 2] == hamming_decoding_list[j, 3]:
s[i, j] -= 1
for i in range(0, int((code_length - remainder) / 4) - 1):
for j in range(0, 3):
hamming_decoding_result_tem += chr(s[i, j])
for i in range(0, remainder - 1):
hamming_decoding_result_tem += chr(s[int((code_length - remainder) / 4), i])

decode_length = len(hamming_decoding_result_tem)
flag = 1
break_while = 0
while flag < decode_length - 3 + remainder:
for i in range(0, 26):
str_tem = hamming_decoding_result_tem[flag:flag + i - 1]
for j in range(0, 26):
if str_tem == huffman_coding_list[j, :]:
if j == 1:
hamming_decoding_result += chr(32)
else:
hamming_decoding_result += chr(j + 96)
flag += len(str_tem)
break_while = 1
break
if break_while == 1:
break

print(hamming_decoding_result)