V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
louchenabc
V2EX  ›  程序员

[脚本的魅力] 分享一点代码

  •  
  •   louchenabc ·
    lcomplete · 165 天前 · 1293 次点击
    这是一个创建于 165 天前的主题,其中的信息可能已经有所发展或是发生改变。

    v2 都是大牛,所以只分享一点代码,原文描述部分基本都删了,感兴趣的可以查看原文 掘金Github

    一、js 写爬虫

    首先:

    var ids = [1, 2, 3]; // 项目 id
    
    var mrs = []; // 数据结果
    
    function mr(idx) {
      if (idx < ids.length) {
        fetch(
          "https://xxx.com/gitlab/projects/" +
            ids[idx] +
            "/getMergeRequests?order_by=updated_at&page=1&per_page=100&state=all",
          { credentials: "same-origin" }
        )
          .then((res) => res.json())
          .then((r) => {
            if (r.status === "success") {
              mrs.push(...r.result.merge_requests);
            } else {
              console.log(ids[idx] + " failed");
            }
            mr(idx + 1); // 为了避免并行发起过多请求,所以这里采用回调递归调用
          });
      } else {
        console.log("done");
      }
    }
    
    mr(0); // 开始抓取
    

    然后导入 excel ,使用 Power Query M 语言转换数据:

    let responseJson = Json.Document(File.Contents("D:\mrs.json")),
    headers =
     let
     allHeaders = List.Combine(List.Transform(responseJson, Record.FieldNames)),
     uniqueHeaders = List.Distinct(allHeaders)
     in
     uniqueHeaders,
     testTable = Table.FromRecords(responseJson, headers, MissingField.UseNull)
     in
     testTable
    

    二、统计上百个 Java 代码仓库中单元测试的数量

    find . -name '*Test.java' | xargs grep -i '@Test[^a-z]' | awk -F / '{count[$3]++;} END {for(i in count) {print i,count[i]}}' | clip.exe
    

    三、python 扫描 redis

    import redis
    r = redis.Redis()
    t = r.scan()
    while t[0]:
        t = r.scan(t[0])
    

    四、rabbitmq 消息队列转发

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='ip', port=5672, virtual_host='/', credentials=pika.PlainCredentials('account','password')))
    
    channel = connection.channel()
    
    def backcall(ch, method, properties, body):
        # 转发
        channel.basic_publish(exchange='exchange', routing_key='routing_key', body=body)
    
    channel.basic_consume('原队列',backcall, True)
    
    channel.start_consuming()
    connection.close()
    

    五、python 生成 mysql 数据字典

    import mysql.connector
    import importlib
    import sys
    
    
    def generate(database_name):
        """
        生成数据库字典表
        """
        importlib.reload(sys)
    
        # 使用前修改配置
        conn = mysql.connector.connect(
            host='localhost',
            port='3306',
            user='',
            password='',
            use_pure=True
        )
    
        cursor = conn.cursor()
    
        cursor.execute(
            "SELECT TABLE_NAME, TABLE_COMMENT FROM information_schema.TABLES WHERE table_type='BASE TABLE' AND TABLE_SCHEMA='%s'" % database_name
        )
    
        tables = cursor.fetchall()
    
        markdown_table_header = """\n\n\n### %s (%s) \n| 序号 | 字段名称 | 数据类型 | 是否为空 | 字段说明 |\n| :--: |----| ---- | ---- | ---- |\n"""
        markdown_table_row = """| %s | %s | %s | %s | %s |"""
    
        f = open('dict/'+database_name + '.md', 'w', encoding="utf-8")
    
        for table in tables:
    
            cursor.execute(
                "SELECT ORDINAL_POSITION, COLUMN_NAME, COLUMN_TYPE, IS_NULLABLE, COLUMN_COMMENT "
                "FROM information_schema.COLUMNS WHERE TABLE_SCHEMA='%s' AND TABLE_NAME='%s'" % (
                    database_name, table[0]
                )
            )
    
            tmp_table = cursor.fetchall()
            p = markdown_table_header % (table[0], remove_newline(table[1]))
            for col in tmp_table:
                colf = list(col)
                colf[2]=col[2].decode() # mysql 高级版本需要解码,代码有点丑,临时性的,能用就行
                colf[4]=col[4].decode()
                p += (remove_newline(markdown_table_row % tuple(colf)) + "\n")
            print(p)
            f.writelines(p)
    
        f.close()
        cursor.close()
        conn.close()
    
    
    def remove_newline(text):
        """
        去除文本中的换行符号
        """
        return text.replace("\r", "").replace("\n", "")
    
    
    if __name__ == '__main__':
        conn = mysql.connector.connect(
            host='localhost',
            port='3306',
            user='',
            password='',
            use_pure=True
        )
    
        cursor = conn.cursor()
    
        cursor.execute("SHOW DATABASES");
    
        dbs = cursor.fetchall()
    
        for db in dbs:
            generate(db[0])
    
        cursor.close()
        conn.close()
    
    2 条回复    2021-12-07 14:18:58 +08:00
    yamedie
        1
    yamedie  
       165 天前
    我来数数楼主用了几种语言: js / python / sql / shell 命令, 还有个听都没听过的 M 函数
    yamedie
        2
    yamedie  
       165 天前   ❤️ 1
    递归调用 mr(idx + 1)这个写法, qps 还是太高了, 很容易被封 ip, 我一般都 async / await sleep(3000)这样慢慢的请求...
    关于   ·   帮助文档   ·   API   ·   FAQ   ·   我们的愿景   ·   广告投放   ·   感谢   ·   实用小工具   ·   2532 人在线   最高记录 5497   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 38ms · UTC 13:15 · PVG 21:15 · LAX 06:15 · JFK 09:15
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.