NAT概述
互联网的迅速发展给大家日常带来了无限便利,互联网依赖于tcp/ip网络,ip地址使得全球计算机得以在Internet上互通互联;ip地址(ipv4)数量有限,到了今天已十分紧缺了;这里介绍一下NAT(Network Address Translation,网络地址转换)是解决公有ip地址有限和内外网安全隔离的技术,讲解NAT特点分类,和提供一个测试程序(python)。
在Internet世界里,区分各个主机是用32bit的ip地址(公网的),主机之间的连接是通过路由器,路由器里面有个路由表用指导数据包如何转发的规则;譬如,A主机要发送一条报文'hello'给Internet某个角落的B主机,那么A先将报文封装上带有目标ip的包头发到A的直接路由器R1上,R1用收到的数据包里获取到包头的目标ip,拿头目标ip在自己的路由表里匹配寻找是否有主机记录或从下一跳路由器发出,最终数据包到达B主机。另外ip地址只能定位到主机,至于是主机上属于哪个进程的呢,有个端口号的整数值区分,属于tcp/ip分层里传输层的一个16bit的值。所以在给网络上某个进程请求或回复数据,ip地址+端口号给定了就能正确送达。 在一个局域网里只有某一台主机N能上网,那么其他主机也想连网怎么办呢,不可能各个去电信拉条宽带吧;假设主机N有公有ip地址ipn,能在Internet上通信;Internet网络底层不外乎ip数据包的收发吧;那么局域网其他主机可以把数据包收发中介都通过主机N代理;局域网每个主机有网部的ip,对应进程有端口,要在Internet上通信,要让大家认识才行,那么在主机N上就把局域网主机ip和端口换成自己的公网ip并分配一个端口,这样就有了一个对应关系localip:localport <-->publicip:publicport。后面接收到的数据也按这个关系逆向替换并转发,这种局域网内的主机就可以和外网通信了,主机N在这个过程做的事情就是网络地址转化NAT。
NAT分类
- NAT1: Full Cone
完全椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收任何外部主机的数据包 - NAT2: Restricted Cone
地址受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,并且能接收曾发送过数据的目标主机的数据包 - NAT3: Port-Restricted Cone
地址和端口受限椎型,相同内部ip:port,不管目标如何都映射相同的外部ip:port,只能接收来自曾发送过的目标地址+端口的数据包 - NAT4: Symmetric
对称型,相同内部ip:port,当目标不同时自身映射出的外部ip:port是不同的。
按本地和公网地址映射规则划分为上面4种,用1~4越来约束越多,当然对内网的隔离保护就越安全,但越到最后能实现p2p穿透通信的难度越大,有NAT4的网络环境基本上不能p2p通信的。p2p即点对点通信,在大文件大流量传输的场景可以减轻服务端网络IO,加快客户收发速度。譬如有一即可通信的服务端在北京,有一个广州的用户和深圳的用户,两个用户要发文件,传统的发送流程数据流都经过北京服务器中转,而p2p通信会在开始时借助北京服务器查询到对方用户的公网ip和端口,后面的数据流直接发送不经北京服务器。NAT1,NAT2,NAT3都是可以打通进行p2p通信的,实际的网络可能很复杂,局域网用户可能经过多层NAT设置代理,接收方亦如是,只要一个是NAT4对称型的就会失败。曾经也见过有人发表过如何通过猜测端口打通对称型NAT设备,我觉得成功率还得看运气成份。
NAT检测
本人出于兴趣开发了个基于python2的NAT检测程序,就一个py文件(服务端/客户端共用,启动时参数不同),先修改好配置,部署两台服务器,然后在所要测试的环境上跑客户端,结果就会在客户端标准输出打印了。
由于检测时需要借助两个公网服务,我这个现成的提供自带以下两个s1,s2,代码默认配置就是这两个,如果自己另外部署就自行修改。
-
s1: cn.cppcloud.cn
-
s2: usa.cppcloud.cn
github地址:
https://github.com/kawloong/NatDetect/
下面也直接贴出代码,不想去github下载就直接copy到本地保存。
快速运行:
python natchk.py c
#! /usr/bin/python
# -*- coding:utf-8 -*-
#
# @Version : 1.0
# @Time : 2018/4/1
# @Author : hejl
# @File : natchk.py
# @Summery : Detect client's local network on which type of NAT
'''
测试用户所在环境的NAT设备类型
NAT1: Full Cone
NAT2: Restricted Cone
NAT3: Port-Restricted Cone
NAT4: Symmetric
拓普模型:
两服务端 + 一客户端,服务端要求有有独立公网IP,至少一台,
客户端在要测试的网络环境机器上执行, 服务端的防火墙需放行相关的UDP端口
C(client)
__________________________|_____________________________
1| ↑8 2| ↑7 ↑6 ↑9 10| ↑11
↓ | ↓ | | | | |
Server1 S1:P1 S1:P2 S1:P3 | | |
|----------------------------------------↑3 4 5 | ↓ |
Server2 +----------------------------------------------------→ S2:P1 -------------→ S2:P2 ------------→ S2:P3
序列图: (UDP响应无顺序性,6/7/8/9先后到达不影响)
c S1:P1 S1:P2 S1:P3 S2:P1 S2:P2 S2:P3
| | | | | | |
|---------->| 1 | | | | |
|-----------+----------->|2 | | | |
|<----------+------------| | | | |
|7 | | | | | |
| |------------+---------->|3 | | |
| |------------+-----------+-------------------->|4 | |
|6 | | | +----------->|5 |
|<----------+------------------------| | | |
|8 | | | | | |
|<----------+ | | | | |
| | | | | | |
<----------+------------+-----------+---------------------+------------| |
|9 | | | | | |
| | | | | | |
| | | | | | |
|-----------+------------+-----------+---------------------+------------+------------->|10
|<----------+------------+-----------+---------------------+------------+--------------|11
| | | | | | |
| | | | | | |
| | | | | | |
响应的rStep(程序内的标识响应的身份)与上面时序标号关系:
rStep1 <---> 8
rStep2 <---> 7
rStep3 <---> 6
rStep4 <---> 9
rStep5 <---> 11
传参:
s1: 服务端1
s2: 服务端2 [可选,没有时会降低准确度]
c: 客户端
运行环境注意:
此工具服务端和客户端都使本同一程序文件,服务端要求有两个公网IP(相应端口不能被防火墙拦截),客户端要求能连通公网;
使用示例:
1. 获得natchk.py文件之后,修改程序开始处的服务器地址
s1_ip='第一台具备公网IP的机器' (本文件85行)
s2_ip='第二台具备公网IP的机器' (本文件90行)
2. 运行服务端程序
第一台上运行 python natchk.py s1
第二台上运行 python natchk.py s2
3. 在待检测的环境运行客户端程序
python natchk.py c
网络正常的话10内打印出检测结果
结果输出样式Test Summery: NAT3 Port Restricted
'''
import threading
import socket
import json
import sys
s1_ip='cn.cppcloud.cn'
s1_port1=17770 # listen and recv+resp
s1_port2=17771 # listen and recv+resp
s1_port3=18770 # sendto
s2_ip='usa.cppcloud.cn'
s2_port1=27770 # listen for s1 notify;
s2_port2=27771 # sendto
s2_port3=28770 # listen and recv+resp # check Symmetric (Last)
c_ip='0'
c_port=0
wait_timeout_ms = 5
def createUdpSock(ip, port):
us = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
if ip > 0 and port > 0:
us.bind( (ip,port) )
print("socket Bind to " + str(us.getsockname()) )
return us
def udpSendTo(us, serv, port, data):
strdata = json.dumps(data)
return us.sendto(strdata, (serv, port))
def udpRecvFrom(us, waitsec, count):
try:
datalst = []
us.settimeout(waitsec)
for i in range(count):
datalst.append(us.recvfrom(1024))
except socket.timeout, e:
pass
return datalst
def loadJsonStr(datastr):
try:
dt = json.loads(datastr)
except ValueError,e:
dt = {}
return dt
# Server-1 Process
def Server1_1():
serv1 = createUdpSock(s1_ip, s1_port1)
serv3 = createUdpSock(s1_ip, s1_port3)
while True:
datastr,addr = serv1.recvfrom(1024)
datadict = loadJsonStr(datastr)
datadict["cli_addr"] = addr[0]
datadict["cli_port"] = addr[1]
s1log = []
# notify Server2 Sendto
if s2_ip:
datadict["rStep"] = 4
nret = udpSendTo(serv1, s2_ip, s2_port1, datadict)
s1log.append('notify S2(%s) nsend=%d' % (s2_ip, nret))
datadict["rStep"] = 3
nret = udpSendTo(serv3, addr[0], addr[1], datadict) # other port response
s1log.append('s1_port3 resp nsend=%d' % nret)
datadict["rStep"] = 1
datadict["msglog"] = s1log
nret = udpSendTo(serv1, addr[0], addr[1], datadict) # response echo
s1log.append('s1_port1 resp nsend=%d' % nret)
print("Serv1-Recv| client="+str(addr)+'| detail=' + ';'.join(s1log))
# Server-1 Process
def Server1_2():
ServEcho(s1_ip, s1_port2, 2)
def ServEcho(servIP, servPort, rStep):
serv = createUdpSock(servIP, servPort)
while True:
datastr,addr = serv.recvfrom(1024)
datadict = loadJsonStr(datastr)
datadict["cli_addr"] = addr[0]
datadict["cli_port"] = addr[1]
datadict["rStep"] = rStep
udpSendTo(serv, addr[0], addr[1], datadict)
# Server-2 Process # listen for Server1's notify
def Server2_1():
serv1 = createUdpSock(s2_ip, s2_port1)
serv2 = createUdpSock(s2_ip, s2_port2)
while True:
datastr,addr = serv1.recvfrom(1024)
datadict = loadJsonStr(datastr)
if not "cli_addr" in datadict:
print('Invalid NotifyMsg:'+datastr)
continue
cliaddr = datadict["cli_addr"]
cliport = datadict["cli_port"]
# datadict["rStep"] = 4
nret = udpSendTo(serv2, cliaddr, cliport, datadict)
print('Serv1(%s) Notify test Client(%s), nsend=%d' % (addr,cliaddr, nret) )
# Server-2 Process # Check Last
def Server2_2():
ServEcho(s2_ip, s2_port3, 5)
# check is valid bind IP addr for server
def isValidServAddr(ipv4):
try:
addrs = socket.getaddrinfo(socket.gethostname(),None)
except:
addrs = []
addrlist = [item[4][0] for item in addrs if ':' not in item[4][0]]
bret = ipv4 in addrlist
if not bret:
try:
stmp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
stmp.bind( (ipv4, 60330) )
bret = True
except BaseException,e:
bret = False
print('IpAddr %s isnot local addr%s'%(ipv4, addrlist))
print(e)
finally:
stmp.close()
return bret
def runS1():
if not isValidServAddr(s1_ip):
return 1
t1 = threading.Thread(target=Server1_1, name='S1-Listen1')
t2 = threading.Thread(target=Server1_2, name='S1-Echo')
t1.start()
t2.start()
t1.join()
t2.join()
return 0
def runS2():
if not isValidServAddr(s2_ip):
return 2
t1 = threading.Thread(target=Server2_1, name='S2-ListenNotify')
t2 = threading.Thread(target=Server2_2, name='S2-Echo')
t1.start()
t2.start()
t1.join()
t2.join()
return 0
# client process
def runClient():
clisock = createUdpSock(c_ip, c_port)
udpSendTo(clisock, s1_ip, s1_port1, {'step': 1})
udpSendTo(clisock, s1_ip, s1_port2, {'step': 2})
print('client Sock is %s' % str(clisock.getsockname()))
# 接收各路响应
resplst1 = udpRecvFrom(clisock, wait_timeout_ms, 4)
resplst2 = []
if s2_ip > '':
udpSendTo(clisock, s2_ip, s2_port3, {'step': 3}) # server-2 echo
resplst2 = udpRecvFrom(clisock, wait_timeout_ms, 1)
clisock.close()
calcSummery(resplst1, resplst2)
def calcSummery(rsp1, rsp2):
rspmap = {}
for item in rsp1+rsp2:
nitem = json.loads(item[0])
nitem['serv'] = item[1]
rspmap[nitem.get('rStep')] = nitem
print(nitem)
if not 1 in rspmap:
print('Server1 not work Or Offline')
return
if s2_ip > '' and not 5 in rspmap:
print('Server2 not work')
return
nat_type = 'unknow'
if s2_ip > '': # 完整的服务
if 4 in rspmap:
nat_type = 'NAT1 Cone'
elif rspmap[1]['cli_port'] != rspmap[5]['cli_port']:
nat_type = 'NAT4 Symmetric'
elif 3 in rspmap:
nat_type = 'NAT2 Address Restricted'
else:
nat_type = 'NAT3 Port Restricted'
else:
if rspmap[1]['cli_port'] != rspmap[2]['cli_port']:
nat_type = 'NAT4 Symmetric'
elif 3 in rspmap:
nat_type = 'NAT2 Address Restricted(Maybe) or NAT1'
else:
nat_type = 'NAT3 Port Restricted(Maybe)'
print("Test Summery: "+nat_type)
if __name__ == "__main__":
param = ''
if len(sys.argv) > 1:
param = sys.argv[1]
runfunlst = {'s1': runS1, 's2': runS2, 'c': runClient}
while not param in runfunlst:
param = raw_input('''Please Select Run Mode:
s1: Server-1 Process
s2: Server-2 Process
c: Client
>>''')
param = param.strip()
run = runfunlst[param]
run()