V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
yuanfnadi
V2EX  ›  Go 编程语言

Go 初学者想要 V 友帮忙看一下代码

  •  
  •   yuanfnadi · 2017-07-17 21:26:51 +08:00 · 1793 次点击
    这是一个创建于 2475 天前的主题,其中的信息可能已经有所发展或是发生改变。

    主要目的是为了把 fabio 的日志写入到 kafka。 用 cmd 执行 fabio 的启动日志,读取 stdout 和 stderr。然后从控制台输出,还有就是写入到 kafka。

    package main
    
    import (
    	"bufio"
    	"fmt"
    	"io"
    	"os/exec"
    	"strings"
    	"github.com/Shopify/sarama"
    )
    
    
    
    func main() {
    	execCommand()
    }
    
    func execCommand() {
    	cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties")
    	fmt.Println(cmd.Args)
    	stdout, err := cmd.StdoutPipe()
    	if err != nil {
    		fmt.Println(err)
    	}
    	stderr, err := cmd.StderrPipe()
    	if err != nil {
    		fmt.Println(err)
    	}
    	cmd.Start()
    	go printLog(stdout)
    	go printLog(stderr)
    	cmd.Wait()
    }
    
    func printLog(readCloser io.ReadCloser)  {
    	reader := bufio.NewReader(readCloser)
    	for {
    		line, err2 := reader.ReadString('\n')
    		if err2 != nil || io.EOF == err2 {
    			break
    		}
    		fmt.Print(line)
    	}
    }
    

    作为 JAVA 程序员,第一次写 go 语言,就 go 有些不是很了解。 这样在并发高的时候会不会有问题,消耗内存会不会过多。 有没有更好的写法?输出到 kafka 的内容变成一个函数 写在 printLog 里面如何?

    11 条回复    2017-07-18 03:10:34 +08:00
    yuanfnadi
        1
    yuanfnadi  
    OP
       2017-07-17 21:34:40 +08:00
    另外 kafka 的对象,我是 java 里面一般是定义一个 private 的对象,然后用 getProducer 来实现的。golang 是否是这样?
    yuanfnadi
        2
    yuanfnadi  
    OP
       2017-07-17 22:21:24 +08:00
    ````golang
    package main

    import (
    "bufio"
    "fmt"
    "io"
    "os/exec"
    "strings"
    "github.com/Shopify/sarama"
    )


    var (
    asyncProducer *sarama.AsyncProducer
    )



    func main() {
    fmt.Println(gerProducer())
    fmt.Println(gerProducer())

    execCommand()
    }

    func execCommand() {
    cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties")
    fmt.Println(cmd.Args)
    stdout, err := cmd.StdoutPipe()
    if err != nil {
    fmt.Println(err)
    }
    stderr, err := cmd.StderrPipe()
    if err != nil {
    fmt.Println(err)
    }
    cmd.Start()
    go printLog(stdout)
    go printLog(stderr)
    cmd.Wait()
    }

    func printLog(readCloser io.ReadCloser) {
    reader := bufio.NewReader(readCloser)
    for {
    line, err2 := reader.ReadString('\n')
    if err2 != nil || io.EOF == err2 {
    break
    }
    fmt.Print(line)
    go sendMessage(line)
    }
    }

    func gerProducer() sarama.AsyncProducer {
    if asyncProducer != nil {
    fmt.Println("Sd")
    return *asyncProducer
    }else {
    fmt.Println("diyici")
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = sarama.WaitForAll
    p, err := sarama.NewAsyncProducer(strings.Split("192.168.1.1:9092", ","), config)
    if err != nil {
    fmt.Println("kafka failed")
    }
    asyncProducer = &p
    return *asyncProducer
    }
    }

    func sendMessage(message string) {
    msg := &sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.ByteEncoder(message),
    }
    p:=*asyncProducer
    p.Input() <- msg
    }
    ````
    rrfeng
        3
    rrfeng  
       2017-07-17 22:23:18 +08:00
    不是很了解 Java
    高并发是指 fabio 输出很多很多 log ?那可能这里 fmt.Print() 是性能瓶颈。可以参考一下各种日志库。

    感觉你的需求可以直接用 filebeat
    如果是练手那当我没说
    yuanfnadi
        4
    yuanfnadi  
    OP
       2017-07-17 22:25:18 +08:00
    @rrfeng fmt 到时候应该会用配置来关闭,fmt.print()的代码会替换成 func sendMessage(message string)

    主要是第一次写 go 完全不知道自己代码有没有结构性错误,用的都是 java 的思想来写,所以想请一个擅长 go 的帮我指导一下。
    yuanfnadi
        5
    yuanfnadi  
    OP
       2017-07-17 22:33:35 +08:00
    yuanfnadi
        6
    yuanfnadi  
    OP
       2017-07-17 22:37:02 +08:00
    hcymk2
        7
    hcymk2  
       2017-07-17 23:43:32 +08:00
    还不如自己先测试下。
    akrf
        8
    akrf  
       2017-07-18 01:43:01 +08:00
    作为 Java 程序员,肯定写啥都像 Java,别挣扎了,老老实实用吧
    jarlyyn
        9
    jarlyyn  
       2017-07-18 02:13:14 +08:00   ❤️ 1
    看得我一愣一愣的。特别是这么多 go。

    对你用的工具不熟悉。

    但是不明白你为什么起这么多协程。

    如果是我的话,首先不会写这个 gerProducer,初始化的时候直接初始化不就好了么。没看出来它啥时候会变 nil。

    其次感觉你是为了尽可能快的异步读日志再转写?

    那感觉开一个 message 的 chan,设一个恰当的缓存大小。

    然后 fabio 那里 go 两个协程出来写入 chan

    sendMessage 那里 go 个写成出来读 chan,感觉更符合调理一些。

    个人 YY,对你这个业务不熟。

    另外,按你这个写法是不是要维护个链接池?

    不然连接数容易炸吧?
    jarlyyn
        10
    jarlyyn  
       2017-07-18 03:05:20 +08:00
    如果我写可能是这样?

    var asyncProducer *sarama.AsyncProducer
    var cmd *exec.cmd
    var quit make (chan int)
    var stdout io.ReadCloser
    var stderr io.ReadCloser
    var outputs make(chan string,10)
    func main(){
    initProducer()
    defer asyncProducer.Close()
    initCmd()
    initStdout()
    defer stdout.close()
    pipeMessage(stdout,outputs)
    initStderr()
    defer stderr.close()
    pipeMessage(stderr,outputs)
    for{
    select{
    case quit:
    return
    case newline:= <- outputs :
    asyncProducer.Input() <-&sarama.ProducerMessage{
    Topic: "test",
    Value: sarama.ByteEncoder(newline),
    }
    case err := <-asyncProducer.Errors():
    fmt.Println("kafka error")
    case <-asyncProducer.Successes():
    }
    }

    }
    func initProducer(){
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true
    config.Producer.RequiredAcks = sarama.WaitForAll
    p, err := sarama.NewAsyncProducer(strings.Split("192.168.1.1:9092", ","), config)
    if err != nil {
    fmt.Println("kafka failed")
    panic(err)
    }
    asyncProducer = &p

    }
    func initCmd(){
    cmd := exec.Command("/fabio", "-cfg", "/etc/fabio/fabio.properties")
    fmt.Println(cmd.Args)
    go func(){
    defer close(quit)
    err:=cmd.Run()
    if err!=nil{
    panic(err)
    }
    }
    }

    func initStdout(){
    stdout, err := cmd.StdoutPipe()
    if err != nil {
    panic(err)
    }
    }
    func initStderr(){
    stderr, err := cmd.StderrPipe()
    if err != nil {
    panic(err)
    }
    }
    func pipeMessage(reader io.ReadCloser,output chan string){
    reader := bufio.NewReader(readCloser)
    go func(){
    for {
    line, err2 := reader.ReadString('\n')
    if err2 != nil || io.EOF == err2 {
    break
    }
    fmt.Print(line)
    output <- line
    }
    }
    }
    jarlyyn
        11
    jarlyyn  
       2017-07-18 03:10:34 +08:00
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   我们的愿景   ·   实用小工具   ·   2767 人在线   最高记录 6543   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 25ms · UTC 15:20 · PVG 23:20 · LAX 08:20 · JFK 11:20
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.