Python实现双端通信协议解析及序列化和反序列化

使用 Python 实现类似 Protocol Buffers 的通信协议解析及序列化和反序列化等功能

题目详情

原题截图

题解思路

题解思路见后文源码中的注释,基本上对每一步逻辑都作了详细解释;
关于压缩的算法单独提一下,一开始的压缩策略是将0~9数字映射为英文字符,然后和并行相邻的字符,代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
## ================压缩================
def dumpComp(self, name, obj):
def GenUnitSubStr(count, char):
return "%d%s" % (count, chr(int(char, 16) + ord('a'))) if count > 1 else chr(int(char, 16) + ord('a'))

raw_result = self.dumps(name, obj)
comp_result = ""
curr_count = 0
curr_char = raw_result[0]
# 合并最长的相邻同字符的子串,如"000111"合并为"3a2b"
# 将 0-f 映射到 a-p,如果合并后子串长度为1,则不保留长度信息
for char in raw_result:
if curr_char == char:
curr_count += 1
else:
comp_result += GenUnitSubStr(curr_count, curr_char)
curr_count = 1
curr_char = char
comp_result += GenUnitSubStr(curr_count, curr_char)
return comp_result

## ================解压================
def loadComp(self, name, s):
raw_str = ""
idx_comp_str = 0
while idx_comp_str < len(s):
if s[idx_comp_str].isdigit():
idx_start = idx_comp_str
while s[idx_comp_str].isdigit():
idx_comp_str += 1
raw_str += "".ljust(int(s[idx_start : idx_comp_str]), hex(ord(s[idx_comp_str]) - ord('a'))[2:])
else:
raw_str += hex(ord(s[idx_comp_str]) - ord('a'))[2:]
idx_comp_str += 1
return self.loads(name, raw_str)

不过最终的数据压缩没有达到OJ系统满意的压缩比,并且耗时增加了将近一倍:

第一次提交OJ提示

第一次提交OJ数据

考虑到可能是由于相邻相同字符出现的概率较小,于是更换了LZW压缩算法,也就是后文中的压缩算法实现,关于此算法的详解,可以参考这篇文章:LZW压缩算法原理解析 - SegmentFault 思否

代码结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# -*- coding: utf8 -*-

import struct

class ProtoParser(object):

def __CheckNullChar(self, char):
return not len(char.strip())

def __HandleProtoName(self):
# 协议名起始下标
__idx_begin = self.__idx_content
# 扫描器下标推进直到遇到左花括号
while self.__proto_content[self.__idx_content] != '{':
self.__idx_content += 1
# 创建新的协议信息列表,记录成员信息
self.__curr_proto = self.__proto_content[__idx_begin : self.__idx_content].strip()
self.__proto_list[self.__curr_proto] = []
# 推进下标到左花括号下一个字符
self.__idx_content += 1
# 扫描器下标推进跳过空字符
while self.__CheckNullChar(self.__proto_content[self.__idx_content]):
self.__idx_content += 1
# 跳转状态机到变量类型处理状态
self.__tokenize_status = "VAR_TYPE"

def __HandleVarType(self):
# 变量类型起始下标
__idx_begin = self.__idx_content
# 推进下标直到遇到空字符或左方括号
while not self.__CheckNullChar(self.__proto_content[self.__idx_content]) \
and self.__proto_content[self.__idx_content] != '[':
self.__idx_content += 1
# 协议成员数据对象,变量类型,是否为数组,数组长度
member = {"Type" : self.__proto_content[__idx_begin : self.__idx_content],
"IsArray" : False, "Len" : -1}
# 如果当前下标字符为左括号,则按照数组语法继续解析变量类型
if self.__proto_content[self.__idx_content] == '[':
member["IsArray"] = True
# 下标推进到下一个字符
self.__idx_content += 1
__idx_begin = self.__idx_content
# 推进下标直到遇到右方括号
while self.__proto_content[self.__idx_content] != ']':
self.__idx_content += 1
# 根据数组长度字符串解析数组长度,-1 为变长数组
str_ary_len = self.__proto_content[__idx_begin : self.__idx_content].strip()
if len(str_ary_len) != 0:
member["Len"] = int(str_ary_len)
# 下标推进至右方括号后一个字符
self.__idx_content += 1
# 否则继续推进下标直到字符为非空
else:
while self.__CheckNullChar(self.__proto_content[self.__idx_content]):
self.__idx_content += 1
# 将当前标签对象存储到正在编译的协议 Member 列表中
self.__proto_list[self.__curr_proto].append(member)
# 跳转状态机到变量名处理状态
self.__tokenize_status = "VAR_NAME"

def __HandleVarName(self):
# 变量名起始下标
__idx_begin = self.__idx_content
# 推进下标直到遇到分号
while self.__proto_content[self.__idx_content] != ';':
self.__idx_content += 1
# 将变量名信息添加到当前正在编译的协议最后一条变量信息中
self.__proto_list[self.__curr_proto][-1]["Name"] \
= self.__proto_content[__idx_begin : self.__idx_content].strip()
# 下标推进至分号后一个字符
self.__idx_content += 1
# 继续推进下标检查后续内容,跳转对应的处理状态
while self.__CheckNullChar(self.__proto_content[self.__idx_content]):
self.__idx_content += 1
# 如果下一个非空字符为右花括号,则当前协议体编译完毕
if self.__proto_content[self.__idx_content] == "}":
# 继续推进直到遇到英文字符或下划线,或者文本结束
while True:
self.__idx_content += 1
if self.__idx_content >= len(self.__proto_content):
# 跳转状态至退出状态
self.__tokenize_status = "EXIT"
break
# 如果遇到非空字符并且当前字符不是分号,则跳转状态至协议名处理状态
if not self.__CheckNullChar(self.__proto_content[self.__idx_content]) \
and self.__proto_content[self.__idx_content] != ';':
self.__tokenize_status = "PROTO_NAME"
break
# 否则本协议体尚未编译完成,跳转至变量类型处理状态
else:
self.__tokenize_status = "VAR_TYPE"

def __HandleTypeInt8(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<b', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<b', param[0:2].decode("hex"))[0], param[2:]
else:
raise ValueError

def __HandleTypeUInt8(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<B', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<B', param[0:2].decode("hex"))[0], param[2:]
else:
raise ValueError

def __HandleTypeInt16(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<h', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<h', param[0:4].decode("hex"))[0], param[4:]
else:
raise ValueError

def __HandleTypeUInt16(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<H', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<H', param[0:4].decode("hex"))[0], param[4:]
else:
raise ValueError

def __HandleTypeInt32(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<i', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<i', param[0:8].decode("hex"))[0], param[8:]
else:
raise ValueError

def __HandleTypeUInt32(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<I', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<I', param[0:8].decode("hex"))[0], param[8:]
else:
raise ValueError

def __HandleTypeFloat(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<f', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<f', param[0:8].decode("hex"))[0], param[8:]
else:
raise ValueError

def __HandleTypeDouble(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<d', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<d', param[0:16].decode("hex"))[0], param[16:]
else:
raise ValueError

def __HandleTypeBool(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<?', param).encode("hex")
elif op_type == "LOAD":
return struct.unpack('<?', param[0:2].decode("hex"))[0], param[2:]
else:
raise ValueError

def __HandleTypeString(self, op_type, param):
if op_type == "DUMP":
return struct.pack('<h', len(param)).encode("hex") + param.encode("hex")
elif op_type == "LOAD":
length = struct.unpack('<h', param[0:4].decode("hex"))[0]
return param[4 : 4 + 2 * length].decode("hex"), param[4 + 2 * length:]
else:
raise ValueError

# 协议文本标签化状态机:协议名处理状态,变量类型处理状态,变量名处理状态,退出状态
TokenizeFSM = {"PROTO_NAME" : __HandleProtoName, "VAR_TYPE" : __HandleVarType,
"VAR_NAME" : __HandleVarName, "EXIT" : None}

# 内置变量类型及对应的序列化和反序列化回调函数
BasicTypeBuiltIn = {"int8" : __HandleTypeInt8, "uint8" : __HandleTypeUInt8, "int16" : __HandleTypeInt16,
"uint16" : __HandleTypeUInt16, "int32" : __HandleTypeInt32, "uint32" : __HandleTypeUInt32,
"float" : __HandleTypeFloat, "double" : __HandleTypeDouble, "bool" : __HandleTypeBool,
"string" : __HandleTypeString}

def __init__(self):
self.__proto_list = {} # 协议文本编译完成后的标签化信息
self.__idx_content = 0 # 编译扫描器下标
self.__curr_proto = "" # 当前正在编译的协议
self.__proto_content = "" # 协议文本内容
self.__tokenize_status = "PROTO_NAME" # 标签化状态机当前状态

## ================编译协议================
def buildDesc(self, file):
with open(file, "r") as proto_file:
self.__proto_content = proto_file.read().strip()
self.__idx_content = 0 # 重置扫描器下标
while self.__tokenize_status != "EXIT":
ProtoParser.TokenizeFSM[self.__tokenize_status](self)

# DEBUG - 输出编译完成后的协议信息
# for proto_name, proto_data in self.__proto_list.items():
# print proto_name + "\n".ljust(22, '=')
# for member in proto_data:
# print (member["Type"], member["Name"], member["IsArray"], member["Len"])
# print '\n'


## ================序列化================
def dumps(self, name, d):
# 序列化后字符串
str_result = ""
# 遍历指定名称协议的成员列表
for member in self.__proto_list[name]:
# 如果该成员为数组,则对内部元素逐个进行序列化
if member["IsArray"]:
# 对数组长度进行检查s
if member["Len"] > 0:
if len(d[member["Name"]]) != member["Len"]:
raise IndexError
else:
str_result += struct.pack('<h', len(d[member["Name"]])).encode("hex")
for value in d[member["Name"]]:
# 如果该成员类型为内置基本类型,则直接调用该类型的处理函数,处理字典中对应名称字段数据;否则视为组合类型,进行递归序列化
str_result += ProtoParser.BasicTypeBuiltIn[member["Type"]](self, "DUMP", value) \
if ProtoParser.BasicTypeBuiltIn.get(member["Type"]) \
else self.dumps(member["Type"], value)
# 否则直接序列化该成员
else:
str_result += ProtoParser.BasicTypeBuiltIn[member["Type"]](self, "DUMP", d[member["Name"]]) \
if ProtoParser.BasicTypeBuiltIn.get(member["Type"]) \
else self.dumps(member["Type"], d[member["Name"]])
return str_result

## ================反序列化================
def loads(self, name, s):
def RecursiveLoad(_name, _s):
# 反序列化后的字典
dict_result = {}
# 遍历指定名称协议的成员列表
for member in self.__proto_list[_name]:
# 如果该成员为数组,则对内部元素逐个进行反序列化
if member["IsArray"]:
dict_result[member["Name"]] = ()
length = member["Len"]
# 如果为变长数组,则解析其长度
if member["Len"] < 0:
length, _s = struct.unpack('<h', _s[0:4].decode("hex"))[0], _s[4:]
for _ in xrange(length):
# 根据该成员类型是否为内置基本类型决定反序列化策略,与序列化逻辑相似
value, _s = ProtoParser.BasicTypeBuiltIn[member["Type"]](self, "LOAD", _s) \
if ProtoParser.BasicTypeBuiltIn.get(member["Type"]) else RecursiveLoad(member["Type"], _s)
dict_result[member["Name"]] += (value, )
# 否则直接反序列化该成员
else:
dict_result[member["Name"]], _s = ProtoParser.BasicTypeBuiltIn[member["Type"]](self, "LOAD", _s) \
if ProtoParser.BasicTypeBuiltIn.get(member["Type"]) else RecursiveLoad(member["Type"], _s)
return dict_result, _s

return RecursiveLoad(name, s)[0]

# 使用LZW算法进行压缩和解压,字典长度256:
# '0' -> '\x00', '1' -> '\x01', '2' -> '\x02', ...

## ================压缩================
def dumpComp(self, name, obj):
raw_result = self.dumps(name, obj)
str_result = "" # 压缩后的结果字符串
idx_raw_str = 0 # 压缩前的字符串索引
str_prefix = "" # 前缀字符串
dict_map = {} # 编码映射字典
# 将单字符映射添加到字典中
for i in xrange(16):
dict_map[hex(i)[-1]] = chr(i)
# 执行编码过程
while idx_raw_str < len(raw_result):
# 使用当前位置的字符增长前缀字符串
str_prefix += raw_result[idx_raw_str]
# 如果当前前缀字符串不存在于字典中,
# 输出增长前的前缀字符串,新增字典键值,并将前缀字符串更新为当前字符
if not dict_map.get(str_prefix):
str_result += dict_map[str_prefix[:-1]]
# 检查字典长度,防止ASCII编码溢出
if len(dict_map) < 256:
dict_map[str_prefix] = chr(len(dict_map))
str_prefix = raw_result[idx_raw_str]
idx_raw_str += 1
str_result += dict_map[str_prefix]
return str_result

## ================解压================
def loadComp(self, name, s):
dict_map = {} # 解码映射字典
for i in xrange(16): # 将单字符映射添加到字典中
dict_map[chr(i)] = hex(i)[-1]
idx_comp_str = 1 # 压缩后的字符串索引
symbol_current = s[0] # 初始化当前符号为第一个字符
str_result = dict_map[symbol_current] # 解压缩后的结果字符串
# 执行解码过程
while idx_comp_str < len(s):
symbol_prefix = symbol_current
symbol_current = s[idx_comp_str]
# 如果当前符号位于映射字典中,输出解码后的数据,并生成新的映射键值
if dict_map.get(symbol_current):
str_result += dict_map[symbol_current]
dict_map[chr(len(dict_map))] = dict_map[symbol_prefix] + dict_map[symbol_current][0]
# 否则将当前符号定义为新映射,并输出解码后的数据
else:
dict_map[chr(len(dict_map))] = dict_map[symbol_prefix] + dict_map[symbol_prefix][0]
str_result += dict_map[symbol_prefix] + dict_map[symbol_prefix][0]
idx_comp_str += 1
return self.loads(name, str_result)

第二次提交OJ提示

第二次提交OJ数据

作者

Voidmatrix

发布于

2022-01-20

更新于

2022-01-29

许可协议

评论