georgeandgeorge 2019-12-16
TLV协议是一种通讯协议,一般将数据封装成TLV的形式,即Tag,Length,Value。协议就是指通信双方对数据传输控制的一种规定,规定了数据格式,同步方式,传送速度,传送步骤的问题作出统一的规定。可以理解为两个节点之间为了协同工作,协商一定的规则和约定。例如我们会规定字节序,各个字段类型等。
TLV 是一种可变的格式,其中:
T 可以理解为 Tag 或 Type ,用于标识标签或者编码格式信息;L 定义数值的长度;V 表示实际的数值。T 和 L 的长度固定,一般是2或4个字节,V 的长度由 Length 指定。
图例帧格式如下所示:

由于用到这块,我就自己弄了一个python下的仿真代码,这里就给大家demo一下了:
服务端:
import socket
import threading
import pickle
import time
from TLV import *
# 定义保存所有socket的列表
socket_list = []
# 创建socket对象
ss = socket.socket()
# 将socket绑定到本机IP和端口
ss.bind((‘localhost‘, 2333))
# 服务端开始监听来自客户端的连接
ss.listen()
tlv = TLV(t_ext=7, l_ext=7)
def server_target(s):
try:
# 采用循环不断地从socket中读取客户端发送过来的数据
while True:
line = input()
if line is None or line == ‘exit‘:
break
time.sleep(2)
tlv.add(8,line)
data = pickle.dumps(tlv)
s.send(data)
except Exception:
print(Exception.with_traceback())
while True:
# 此行代码会阻塞,将一直等待别人的连接
s, addr = ss.accept()
#socket_list.append(s)
# 每当客户端连接后启动一个线程为该客户端服务
threading.Thread(target=server_target, args=(s, )).start()客户端:
import socket
import threading
import pickle
from TLV import *
# 创建socket对象
s = socket.socket()
# 连接远程主机
s.connect((‘localhost‘, 2333))
def read_from_server(s):
try:
data = pickle.loads(s.recv(2048))
# test
tlvp = TLVParser(data.buffer, t_ext=7, l_ext=7)
for avp in tlvp.parse():
print("%d(%d): %s" % (avp["type"], avp["length"], avp["value"]))
# return s.recv(2048).decode(‘utf-8‘)
return tlvp
# 如果捕获到异常,则表明该socket对应的客户端已经关闭
except:
# 删除该socket
socket_list.remove(s) # ①
def read_server(s):
try:
while True:
contend = read_from_server(s)
if contend is None:
break
except:
print(Exception.with_traceback())
# 客户端启动线程不断地读取来自服务器的数据
threading.Thread(target=read_server, args=(s, )).start() # ①TLV的实现:
from scapy.all import *
class TLVError(Exception):
pass
class TLV:
def __init__(self, tl_in_l=False, t_ext=0, l_ext=0):
self.buffer = ""
self.tl_in_l = tl_in_l
self.t_ext = t_ext
self.l_ext = l_ext
def _int(self, i, ext):
maxi = 1<<8
if ext > 0:
maxi = (1 << ext)
holdstr = ""
holder = i
extend = 0
count = 1
while holder >= maxi:
count += 1
newnum = (holder & (maxi - 1))
holdstr = chr(newnum | extend) + holdstr
extend = maxi
holder /= maxi
holdstr = chr(holder | extend) + holdstr
return holdstr
def _t(self, t):
if self.t_ext == 0 and t > 256:
raise TLVError("type > 256 and no extension bit set")
return self._int(t, self.t_ext)
def _l(self, l):
if self.l_ext == 0 and l > 256:
raise TLVError("length > 256 and no extension bit set")
return self._int(l, self.l_ext)
def add(self, t, v, l=None):
self.buffer += self._t(t)
length = 0 if l is None else l
if self.tl_in_l:
length += t
if l is None:
length += len(v)
self.buffer += self._l(length)
self.buffer += v
def __str__(self):
return self.buffer
def __repr__(self):
return self.buffer
class TLVParser:
def __init__(self, buffer, tl_in_l=False, t_ext=0, l_ext=0):
self.buffer = buffer
self.tl_in_l = tl_in_l
self.t_ext = t_ext
self.l_ext = l_ext
self.offset = 0
def _get_i(self, i_ext):
try:
byte = ord(self.buffer[self.offset])
except IndexError:
raise TLVError("Not enough data")
ext = 1 << (i_ext if i_ext > 0 else 8)
i = 0
while byte & ext:
i += (byte & (ext - 1))
i <<= i_ext
self.offset += 1
try:
byte = ord(self.buffer[self.offset])
except IndexError:
raise TLVError("Not enough data")
i += byte
self.offset += 1
return i
def _get_tlv(self):
t = self._get_i(self.t_ext)
l = self._get_i(self.l_ext)
if self.offset + l > len(self.buffer):
raise TLVError("Buffer not long enough to encompass TLV")
v = self.buffer[self.offset:self.offset+l]
self.offset += l
return (t, l, v)
def parse(self):
while self.offset < len(self.buffer):
t, l, v = self._get_tlv()
yield {
"type": t,
"length": l,
"value": v,
}
# Test/example program for building TLVs and parsing the TLVs
if __name__ == "__main__":
tlv = TLV(t_ext=7, l_ext=7)
tlv.add(10, "Foobar")
tlv.add(16, "Bladibla")
# hexdump(tlv)
tlvp = TLVParser(tlv.buffer, t_ext=7, l_ext=7)
for avp in tlvp.parse():
print ("%d(%d): %s" % (avp["type"], avp["length"], avp["value"]))具体的代码运行结果就不贴了,对懂得python的同学来说,这个很简单的。