V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
探索世界的好奇心万岁
a719114136
V2EX  ›  分享发现

判断 ip 是否在一个超大 ip 集中(识别国内 ip)

  •  
  •   a719114136 · 158 天前 · 3248 次点击
    这是一个创建于 158 天前的主题,其中的信息可能已经有所发展或是发生改变。

    原文地址: https://www.ikaze.cn/article/65

    前几天发帖/t/785801讨论匹配大量 ip 的方法,经过几天的思考,把最终使用的方法分享给大家。

    1. 通过 ip 位置数据库

    比较有名的服务商有:ipip(付费), maxmind (付费),纯真 (免费)。

    但在这个应用场景下,我们并不需要具体的位置信息,类似的方案会浪费不必要的内存因此放弃。

    2. 利用 ip 的连续性

    后面两个方法有个前提:ip 地址列表中大部分是连续的。

    这里我们已有了国内 ip 地址列表(已有开源的库,很好找,另外我用的这个库已经把 ip 合并为了 CIDR 格式的地址)。

    我们先通过二进制把 ip 转为可直接比较的数字,再把连续的 ip 变为 (start_ip, end_ip) 这样的集合,就可以利用二分法快速查找了。

    import ipcalc
    
    class ChinaIp:
        def __init__(self):
            self.data = []
    
        def load(self, cidr_file='data/china_ip_list.txt'):
            with open(cidr_file, 'r')as f:
                for s in f.readlines():
                    self.add(s.strip())
    
        def add(self, cidr):
            n = ipcalc.Network(cidr)
            self.data.append((n.host_first().ip, n.host_last().ip))
    
        def search(self, ip):
    
            l = 0
            r = len(self.data) - 1
            while l <= r:
                mid = (l + r) // 2
                if self.data[mid][0] <= ip <= self.data[mid][1]:
                    return True
                elif self.data[mid][0] > ip:
                    r = mid - 1
                elif self.data[mid][1] < ip:
                    l = mid + 1
                else:
                    return False
            return False
    
        def __contains__(self, item):
            ip = ipcalc.IP(item).ip
            return self.search(ip)
            
    china_ip = ChinaIp()
    china_ip.load()
    print('223.70.163.83' in china_ip)
    

    3. 利用 CIDR 的特性

    CIDR 是形如 x.x.x.x/n 这样的地址,它表示一组网络地址相同的 ip,其中 n 表示前 n 位作为网络地址。

    根据 CIDR 的特性,我们可以得到这样的结论:同一 CIDR 下的 ip,其网络地址是相同的。

    因此我们可以把所有国内 cidr 地址的网络地址取出,放字典;对于一个 ip,尝试可能的网络地址(即 n ),看其是否在字典中。

    import ipcalc
    
    class ChinaIp(object):
        def __init__(self):
            self.data = {}
    
    
        def load(self, cidr_files='data/china_ip_list.txt'):
            with open(cidr_files, 'r')as f:
                cidr_list = f.readlines()
    
            for cidr in cidr_list:
                self.insert(cidr.strip())
    
        def insert(self, cidr):
            network = ipcalc.Network(cidr)
            self.data[str(network.netmask())]=True
    
        def search(self, netmask: str) -> bool:
    
            for i in netmask.split('.'):
                node = node.children.get(i, None)
                if node is None:
                    return False
                if node.is_end:
                    return True
            return False
    
        def __contains__(self, ip):
            for i in range(1,33):
                if self.search(str(ipcalc.Network(f'{ip}/{i}').netmask())):
                    return True
            return False
            
    china_ip = ChinaIp()
    china_ip.load()
    print('223.70.163.83' in china_ip)
    

    这个算法看起来没啥毛病,但实际测试中速度比第二种慢了很多,耗时的地方在比较时必须循环所有 n,而二分法可以快速的排除不可能的部分。

    对于这种情况,有两种优化方法:

    1. 随机 n 的列表

    class ChinaIp(object):
    
        ...
        
        def __contains__(self, ip):
            l = list(range(1, 33))
            random.shuffle(l)
            for i in l:
                if self.search(str(ipcalc.Network(f'{ip}/{i}').netmask())):
                    return True
            return False
    

    这种方法在测试中,时间减少了一半多。

    2. 排除不会出现的 n

    class ChinaIp(object):
        def __init__(self):
            ...
            self.mask_list = []
        ...
    
        def insert(self, cidr):
            network = ipcalc.Network(cidr)
            self.data[str(network.netmask())] = True
            self.mask_set.add(network.mask)
    
        def __contains__(self, ip):
    
            for i in self.mask_set:
                netmask = str(ipcalc.Network(f'{ip}/{i}').netmask())
                if netmask in self.data:
                    return True
            return False
    
    

    这样优化后速度和第二种持平,不过实际应用中还需要根据 ip 列表的情况来判断需要用哪种。

    _

    第 1 条附言  ·  158 天前
    方法 3 中的 ``__contains__ `` 有点问题,这里不能修改了,看原文把
    35 条回复    2021-06-30 17:53:08 +08:00
    westoy
        1
    westoy  
       158 天前
    netaddr 大法好
    westoy
        2
    westoy  
       158 天前
    我靠, 又是我, 真是有缘.....
    a719114136
        3
    a719114136  
    OP
       158 天前 via Android
    @westoy 又是你在摸鱼
    sujin190
        4
    sujin190  
       158 天前


    之前写过的一个算法参考下,大多数情况一次判断就能得出结果
    aureole999
        5
    aureole999  
       158 天前
    CIDR 的方法如果建一个 Trie 呢?
    如果测试的 ip 不在 china_ip 里的话不管你随不随机 n 那肯定要查找 32 次,Trie 的话只要 ip 段不是极其零碎的话应该比较次数少点?我瞎猜的
    RicardoY
        6
    RicardoY  
       158 天前 via iPhone
    为什么不用字典树…
    fxrocks
        7
    fxrocks  
       158 天前 via Android
    字典不是直接 d[k]查询 v 么或者 d.get(k)?
    没必要自己写个 2 分法吧
    a719114136
        8
    a719114136  
    OP
       158 天前 via Android
    @fxrocks 一共 3 亿个 ip
    a719114136
        9
    a719114136  
    OP
       158 天前 via Android
    @RicardoY
    @aureole999
    不适用,比如 39.155.42.8 在 39.128.0.0/10 中,但他们只有开头是相同的,没法建字典树
    a719114136
        10
    a719114136  
    OP
       158 天前 via Android
    @sujin190 就是第三种方法,但实际测试中这样方法比二分慢很多
    mywaiting
        11
    mywaiting  
       158 天前
    模仿一下路由器查找路由表的实现?

    感觉这货应该有一个很成熟的算法实现
    djj0809
        12
    djj0809  
       158 天前 via iPhone
    @a719114136 建立二进制的 trie tree 啊,算法最差只需要比较 32 次就可以了
    jinliming2
        13
    jinliming2  
       158 天前
    想到一个算法,和上面 3. 利用 CIDR 的特性 应该一样,没测试:
    1,将 IP cidr 转换为 unsigned int 整数,比如 39.128.0.0/10 转为 662700032/10
    2,根据 cidr 长度分别存到不同的 dict/list 中,比如:{ 10: [662700032, 796917760], 15: [1431699456] }(存 list 就排好序,便于后面去查找)
    3,查询的时候 for cidr in cidr_list 遍历每一个 cidr 表,最多 32 个。假如上面这个 10 和 15 。
    4,把要查询的 IP 转为 unsigned int 整数,根据 cidr 特性,使用按位与 & 运算,仅保留数字的前 cidr 位。
    5,到对应的 dict/list 中查询这个数字。
    a719114136
        15
    a719114136  
    OP
       158 天前
    @djj0809 比较 32 次比二分慢很多,方法 3 就是用类似的思路。
    a719114136
        16
    a719114136  
    OP
       158 天前
    @mywaiting 当时也考虑过,但没搜到。linux 自己倒是用了 ipset,但我看他里面貌似用的是 bitmap,hash 等结构。


    另外路由器上的方案一般都要考虑通用性,也不会添加过多的规则。

    对于这个场景 ip 是有规律的,因此有更好的方法。
    aureole999
        17
    aureole999  
       158 天前
    @a719114136 不啊,全换成 2 进制就可以啊,像 /10 的话 Trie 里只有 10 层,比较前 10 位就 ok 了。
    aureole999
        18
    aureole999  
       158 天前
    实际上都是大段大段的 ip,Trie 基本不会比较那么多次。但你的第三种方法在测试不在 china_ip 里的 ip 时一定要比较 32 次。
    sujin190
        19
    sujin190  
       158 天前
    @a719114136 #10 并不完全是,你这个第三种,如果这个 ip 不在里边并不能保证一次判断就能得出结果

    ip 网段最重要特性就是掩码越短,网络范围越大,所以只要找出所有 ip 的全部可能掩码长度,然后在收集每个掩码长度下每个 ip 网段信息,判断的时候按掩码长度从小到大,如果需要判断的 ip 不在列表里,第一次就能得出结果,存在的也可以再优化到再判断一次就能得出结果
    RicardoY
        20
    RicardoY  
       158 天前 via iPhone
    @a719114136 建 01 字典树…
    dndx
        21
    dndx  
       158 天前
    trie 是正解,Linux 内核的路由表就是这么实现的,绝对错不了:

    https://elixir.bootlin.com/linux/latest/source/net/ipv4/fib_trie.c
    shyrock
        22
    shyrock  
       158 天前
    布隆过滤器?
    nuk
        23
    nuk  
       158 天前
    前缀树啊,速度最快没有之一
    no1xsyzy
        24
    no1xsyzy  
       158 天前
    @shyrock 布隆只能精确匹配,这里要前缀匹配
    当然未必不可以把 /10 的 2^22 个地址全塞进去,但这显然空间效率感人。
    不过也不排除某套 hash 算法组合可以极高效地解决这个问题,但找到这套 hash 算法组合的效率更感人。
    lsylsy2
        25
    lsylsy2  
       158 天前
    Tong Yang, Gaogang Xie, YanBiao Li, Qiaobin Fu, Alex X. Liu, Qi Li, and Laurent Mathy. 2014. Guarantee IP lookup performance with FIB explosion. In Proceedings of the 2014 ACM conference on SIGCOMM (SIGCOMM '14). Association for Computing Machinery, New York, NY, USA, 39–50. DOI:https://doi.org/10.1145/2619239.2626297
    https://dl.acm.org/doi/pdf/10.1145/2619239.2626297

    大学组里的老师做的,CPU 实现自己做一个效果很不错
    shyrock
        26
    shyrock  
       158 天前
    @no1xsyzy #24 你搞错了吧,布隆并不存储地址,只需要一个位阵列空间,空间效率高得可怕。
    Goooogle
        27
    Goooogle  
       158 天前
    刚好做过一样的,说一下实现逻辑
    - 国内 IP 可以认为是多个 IP 段组成,转成类似于 1.1.1.1 -> 1.2.3.4 的结构,相邻的 IP 段可以合并
    - IP 转成 unsigned int,那么一个 IP 段可以表示为类似于[1024, 8192]的数据
    - 以 1024 为 Key,8192 为 Value,存放到 TreeMap 中
    - 在查询时,将传入的 IP 也转成 unsigned int,然后去 TreeMap 查询小于等于 IP 的 Key,然后再判断 Value 是否大于等于 IP 即可

    时间复杂度 O(logn),空间的话 O(ip 段数)
    imn1
        28
    imn1  
       158 天前
    如果已经有 start/end 的分段数据,bisect 好像代码更简单,不用写那么长
    dqzcwxb
        29
    dqzcwxb  
       158 天前
    trie 树,也就是 DFA 算法可解决大数据量关键字匹配问题
    wellsc
        30
    wellsc  
       158 天前
    又到了喜闻乐见的无脑推布隆过滤器时间了
    shuianfendi6
        31
    shuianfendi6  
       158 天前
    类似 ls 做法
    1. TreeMap<Comparable>,Comparable 包含起始 ip
    2. 直接用 TreeRangeMap
    a719114136
        32
    a719114136  
    OP
       157 天前
    @aureole999 那怎么知道哪些 ip 需要比较前 10 位,哪些比钱前 8 位呢
    a719114136
        33
    a719114136  
    OP
       157 天前
    @wellsc
    @shyrock
    bloom 是有误差的,对于匹配的还是得再次判断是否在列表中
    shyrock
        34
    shyrock  
       157 天前
    @a719114136 #33 布隆确实有误判率,但是误判率可以通过调整位数和哈希函数的数量来降低。就看这个应用到底允许多高的误判率了。
    aureole999
        35
    aureole999  
       157 天前
    @a719114136 建树的时候这个地址段是 /10 的,那到第 10 层这个叶子结点的时候下面就没有了,自然只比较 10 次就可以判断了。如果你的 ip list 里所有地址都是 /32 的,那在判断属于 china_ip 的 ip 的时候,你就要比较 32 次了,但不属于 china_ip 的地方还是大段连续的,可能比较几次就能出结果了。
    关于   ·   帮助文档   ·   API   ·   FAQ   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2293 人在线   最高记录 5497   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 27ms · UTC 08:33 · PVG 16:33 · LAX 00:33 · JFK 03:33
    ♥ Do have faith in what you're doing.