请教一个关于线程条件变量的问题

2021-08-10 09:06:48 +08:00
 commoccoom

两个线程,线程 1(processData)生成数据 p 并写入数据库,另一个线程 2(someSocket)将线程 1 生成的数据 p 通过 socket 发送到客户端。

当线程 1 中的for循环结束时,如何通知线程 2 while(res == TRUE)应当结束了,线程 2 因为 pthread_cond_wait 一直在阻塞,但是此时线程 1 不会再发出信号了。这是不是就是死锁了。。。

代码如下

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <unistd.h>
#include <mysql.h>
#include <errno.h>
#include <pthread.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <netinet/in.h>
#define TRUE 1
#define FALSE 0
#define MAX_STRING 128
#define PORT 3389
#define SA struct sockaddr

pthread_cond_t pready = PTHREAD_COND_INITIALIZER;
pthread_mutex_t plock = PTHREAD_MUTEX_INITIALIZER;

// 存储温湿度结构体
typedef struct {
    int temp;
    int humd;
}humiture;

// 可变长消息体
typedef struct { 
    int nLen; 
    char data[ 0];
}MyMessage;

//全局变量
humiture p;
int res = TRUE;

void error(char *msg)
{
    fprintf(stderr, "%s: %s\n", msg, strerror(errno));
    exit(1);
}

void info(char *msg)
{
    fprintf(stdout,"%s\n",msg);
}

void finish_with_error(MYSQL *con)
{
  fprintf(stderr, "%s\n", mysql_error(con));
  mysql_close(con);
  exit(1);
}

// socket 发送数据
int sendall(int s, char *buf, int *len)
{
    int total = 0;
    int bytesleft = *len;
    int n;

    while(total < *len) {
        n = send(s, buf+total, bytesleft, 0);
        if (n == -1) { break; }
        total += n;
        bytesleft -= n;
    }

    *len = total;

    return n==-1?-1:0;
} 

// 生成数据
humiture  collectData()
{
    int temperature,humidity;
    srand((unsigned)time(NULL));    // 根据时间来播种随机数种子
    // 生成数据
    temperature = rand()%40+10;     // 生成 10~50 的随机数 当做温度
    humidity = rand()%70+10;    // 生成 10~80 的随机数当做湿度
    
    humiture p = {humidity, temperature};
    
    return p;
}

// 启动 MySQL 建立连接
MYSQL* startMysql()
{
    MYSQL *con = mysql_init(NULL);

    if (con == NULL)
    {        
        fprintf(stderr, "%s\n", mysql_error(con));
        exit(1);
    }

    if (mysql_real_connect(con, "localhost", "root", "root#admin","test", 0, NULL, 0) == NULL)
    {
        finish_with_error(con);
    }

    return con;
}

// 生成数据并存入数据库
void * processData()
{
    MYSQL * con = startMysql();

    for(int i = 0; i <20;i++)
    {
       
        pthread_mutex_lock(&plock);
        p = collectData();
        pthread_cond_signal(&pready);
        pthread_mutex_unlock(&plock);

        char query[MAX_STRING] = {0};

        snprintf(query,MAX_STRING,"INSERT INTO humiture (temperature,humidity) VALUES (%d,%d)", p.temp, p.humd);

        if (mysql_query(con, query)) 
        {
            finish_with_error(con);
        }
        sleep(2);
    } 

    // 循环结束给出信号
    res = FALSE;

    mysql_close(con); 
    mysql_library_end();
    return NULL;
}

void * someSocket()
{
    int sockfd, connfd;
    struct sockaddr_in servaddr, cli;
    socklen_t len;
    char buff[10];
    
    sockfd = socket(AF_INET, SOCK_STREAM, 0);
    if (sockfd == -1) {
        error("socket creation failed...");
    }
    else
        info("Socket successfully created...");

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    servaddr.sin_port = htons(PORT);

    int reuse = 1;
    if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char *)&reuse, sizeof(int)) == -1)
        error("Can't set the reuse option on the socket...");

    if ((bind(sockfd, (SA*)&servaddr, sizeof(servaddr))) != 0) {
        error("socket bind failed...");
    }
    else
        fprintf(stdout,"%s\n","Socket successfully binded...");

    if ((listen(sockfd, 5)) != 0) {
        error("Listen failed...");
    }
    else
       info("Server listening...");    

    while(TRUE)
    {
        len = sizeof(cli);        
        connfd = accept(sockfd, (SA*)&cli, &len);    
        if (connfd < 0) {
            error("server acccept failed...");
        }
        else
            info("server acccept the client...");
        
        MyMessage * myMessage = (MyMessage*)malloc(sizeof(MyMessage)+sizeof(humiture));
        int needSend = sizeof(MyMessage)+sizeof(humiture);  
        char *buffer =(char*)malloc(needSend);

        while(res == TRUE)
        {
            myMessage->nLen = htonl(sizeof(humiture));
            pthread_mutex_lock(&plock);
            pthread_cond_wait(&pready,&plock);
            memcpy(myMessage->data,&p,sizeof(humiture));
            pthread_mutex_unlock(&plock);
            memcpy(buffer,myMessage,needSend);
            sendall(connfd,buffer,&needSend);
            recv(connfd,buff,sizeof(buff),0);
        }
        // 当需要停止的时候发送 0 字节信息让客户端停止循环
        if(res == FALSE) 
        {
            // 将发送消息定义为 0
            myMessage->nLen = htonl(res);
            char *buffer =(char*)malloc(sizeof(int));
            memcpy(buffer,myMessage,sizeof(MyMessage));
            send(connfd,buffer,sizeof(MyMessage),0);
            shutdown(connfd,SHUT_RDWR);
            free(buffer);
        }

        free(myMessage);
        free(buffer);
        close(connfd);
        break;
    }
    close(sockfd);    
    return NULL;
}

int main(void)
{
    pthread_t t0,t1;

    if(pthread_create(&t0, NULL,processData,NULL)==-1)
    {
        error("Can't create thread processData");
    }
    if(pthread_create(&t1,NULL,someSocket,NULL)==-1)
    {
         error("Can't create thread someSocket");
    }

    void *reslut;
    if(pthread_join(t0,&reslut)==-1)
    {
        error("Can't reclaim thread t0");
    }
    if(pthread_join(t1,&reslut)==-1)
    {
        error("Can't reclaim thread t1");
    }

    return 0;
}
1376 次点击
所在节点    C
4 条回复
wevsty
2021-08-10 09:29:11 +08:00
线程 1 退出的时候再发一个 pthread_cond_signal 这样线程 2 收到信号就不会锁死了。最后线程 2 检查一下退出标志再发数据不就行了么?
commoccoom
2021-08-10 10:36:52 +08:00
@wevsty 确实,我人傻了😂
FranzKafka95
2021-08-10 13:11:31 +08:00
或者线程 1 执行完 for 循环以后先别置 res 为 false,再加一个条件变量在这儿等,等到线程 2 发送完以后通过信号量通知线程 1 再置为 false,等到线程 2 再执行 while 循环时 res 已经为 flalse,这样就可以同步了。
commoccoom
2021-08-10 13:33:59 +08:00
@FranzKafka95 后续我的想法是线程 1(processData)会一直循环,线程 2(someSocket)是接收别的信号,然后启动或者停止这样。所以线程 1 不能阻塞,线程 2 可以阻塞。线程 1 现在加了循环次数是因为之前我发现有内存泄漏,所以加个停止条件看看哪里有问题😂。

这是一个专为移动设备优化的页面(即为了让你能够在 Google 搜索结果里秒开这个页面),如果你希望参与 V2EX 社区的讨论,你可以继续到 V2EX 上打开本讨论主题的完整版本。

https://www.v2ex.com/t/794758

V2EX 是创意工作者们的社区,是一个分享自己正在做的有趣事物、交流想法,可以遇见新朋友甚至新机会的地方。

V2EX is a community of developers, designers and creative people.

© 2021 V2EX