c语言环形队列

一位数组队列

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char* buffer;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void* read_thread(void* arg);
void* send_thread(void* arg);

int main() {
    CircularBuffer* buffer = circular_buffer_init();

    pthread_t thread1, thread2;
    pthread_create(&thread1, NULL, read_thread, buffer);
    pthread_create(&thread2, NULL, send_thread, buffer);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    circular_buffer_destroy(buffer);

    return 0;
}

CircularBuffer* circular_buffer_init() {
    CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
    buffer->buffer = (char*)malloc(BUFFER_SIZE);
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
    return buffer;
}

void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == BUFFER_SIZE) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }

    int remaining = PACKET_SIZE - HEADER_SIZE - length;
    if (remaining > 0) {
        memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
    }

    memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
    buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);
    pthread_mutex_unlock(&buffer->lock);
}

void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    memcpy(data, buffer->buffer + buffer->read_idx, length);
    buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);
}

void circular_buffer_destroy(CircularBuffer* buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);
    free(buffer->buffer);
    free(buffer);
}

void* read_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    FILE* file = fopen("input.txt", "r");
    if (file == NULL) {
        printf("Failed to open file.n");
        pthread_exit(NULL);
    }

    char* data = (char*)malloc(PACKET_SIZE);
    memset(data, 0, PACKET_SIZE);
    char header[HEADER_SIZE] = "DATA HEADER";
    memcpy(data, header, HEADER_SIZE);

    ssize_t bytesRead;
    while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circular_buffer_write(buffer, data, PACKET_SIZE);
    }

    free(data);
    fclose(file);
    pthread_exit(NULL);
}

void* send_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
    if (udp_socket < 0) {
        printf("Failed to create UDP socket.n");
        pthread_exit(NULL);
    }

    struct sockaddr_in server_address;
    server_address.sin_family = AF_INET;
    server_address.sin_port = htons(1234);
    server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

    char* data = (char*)malloc(PACKET_SIZE);

    while (1) {
        circular_buffer_read(buffer, data, PACKET_SIZE);

        ssize_t bytesSent = sendto(udp_socket, data, PACKET_SIZE, 0, (struct sockaddr*)&server_address, sizeof(server_address));
        if (bytesSent < 0) {
            printf("Failed to send UDP packet.n");
            pthread_exit(NULL);
        }
    }

    free(data);
    close(udp_socket);
    pthread_exit(NULL);
}

全部打印

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char* buffer;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void print_hex(const char* data, int length);
void* read_thread(void* arg);
void* send_thread(void* arg);

int main() {
    CircularBuffer* buffer = circular_buffer_init();

    pthread_t thread1, thread2;
    pthread_create(&thread1, NULL, read_thread, buffer);
    pthread_create(&thread2, NULL, send_thread, buffer);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    circular_buffer_destroy(buffer);

    return 0;
}

CircularBuffer* circular_buffer_init() {
    CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
    buffer->buffer = (char*)malloc(BUFFER_SIZE);
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
    return buffer;
}

void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == BUFFER_SIZE) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }

    int remaining = PACKET_SIZE - HEADER_SIZE - length;
    if (remaining > 0) {
        memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
    }

    memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
    buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);
    pthread_mutex_unlock(&buffer->lock);

    // Print the written data in hexadecimal format
    print_hex(data, length);
}

void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    memcpy(data, buffer->buffer + buffer->read_idx, length);
    buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);

    // Print the read data in hexadecimal format
    print_hex(data, length);
}

void circular_buffer_destroy(CircularBuffer* buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);
    free(buffer->buffer);
    free(buffer);
}

void print_hex(const char* data, int length) {
    printf("Data: ");
    for (int i = 0; i < length; i++) {
        printf("%02X ", (unsigned char)data[i]);
    }
    printf("n");
}

void* read_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    FILE* file = fopen("./123.txt", "r");
    if (file == NULL) {
        printf("Failed to open file.n");
        pthread_exit(NULL);
    }

    char* data = (char*)malloc(PACKET_SIZE);
    memset(data, 0, PACKET_SIZE);
    char header[HEADER_SIZE] = "DATA HEADER";
    memcpy(data, header, HEADER_SIZE);

    ssize_t bytesRead;
    while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circular_buffer_write(buffer, data, PACKET_SIZE);
    }

    free(data);
    fclose(file);
    pthread_exit(NULL);
}

void* send_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
    if (udp_socket < 0) {
        printf("Failed to create UDP socket.n");
        pthread_exit(NULL);
    }

    struct sockaddr_in server_address;
    server_address.sin_family = AF_INET;
    server_address.sin_port = htons(1234);
    server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

    char* data = (char*)malloc(PACKET_SIZE);

    while (1) {
        circular_buffer_read(buffer, data, PACKET_SIZE);

        ssize_t bytesSent = sendto(udp_socket, data, PACKET_SIZE, 0, (struct sockaddr*)&server_address, sizeof(server_address));
        if (bytesSent < 0) {
            printf("Failed to send UDP packet.n");
            pthread_exit(NULL);
        }

        // Print the UDP sent data in hexadecimal format
        print_hex(data, PACKET_SIZE);
    }

    free(data);
    close(udp_socket);
    pthread_exit(NULL);
}

只打印发送

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char* buffer;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void print_hex(const char* data, int length);
void* read_thread(void* arg);
void* send_thread(void* arg);

int main() {
    CircularBuffer* buffer = circular_buffer_init();

    pthread_t thread1, thread2;
    pthread_create(&thread1, NULL, read_thread, buffer);
    pthread_create(&thread2, NULL, send_thread, buffer);

    pthread_join(thread1, NULL);
    pthread_join(thread2, NULL);

    circular_buffer_destroy(buffer);

    return 0;
}

CircularBuffer* circular_buffer_init() {
    CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
    buffer->buffer = (char*)malloc(BUFFER_SIZE);
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
    return buffer;
}

void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == BUFFER_SIZE) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }

    int remaining = PACKET_SIZE - HEADER_SIZE - length;
    if (remaining > 0) {
        memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
    }

    memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
    buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);
    pthread_mutex_unlock(&buffer->lock);
}

void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    memcpy(data, buffer->buffer + buffer->read_idx, length);
    buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);
}

void circular_buffer_destroy(CircularBuffer* buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);
    free(buffer->buffer);
    free(buffer);
}

void print_hex(const char* data, int length) {
    printf("Data: ");
    for (int i = 0; i < length; i++) {
        printf("%02X ", (unsigned char)data[i]);
    }
    printf("n");
}

void* read_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    FILE* file = fopen("input.txt", "r");
    if (file == NULL) {
        printf("Failed to open file.n");
        pthread_exit(NULL);
    }

    char* data = (char*)malloc(PACKET_SIZE);
    memset(data, 0, PACKET_SIZE);
    char header[HEADER_SIZE] = "DATA HEADER";
    memcpy(data, header, HEADER_SIZE);

    ssize_t bytesRead;
    while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circular_buffer_write(buffer, data, HEADER_SIZE + bytesRead);
    }

    free(data);
    fclose(file);
    pthread_exit(NULL);
}

void* send_thread(void* arg) {
    CircularBuffer* buffer = (CircularBuffer*)arg;

    int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
    if (udp_socket < 0) {
        printf("Failed to create UDP socket.n");
        pthread_exit(NULL);
    }

    struct sockaddr_in server_address;
    server_address.sin_family = AF_INET;
    server_address.sin_port = htons(1234);
    server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);

    char* data = (char*)malloc(PACKET_SIZE);

    while (1) {
        circular_buffer_read(buffer, data, PACKET_SIZE);

        int valid_data_length = PACKET_SIZE - HEADER_SIZE;
        int padded_data_length = valid_data_length;
        for (int i = valid_data_length - 1; i >= 0; i--) {
            if (data[HEADER_SIZE + i] != 0) {
                break;
            }
            padded_data_length--;
        }

        // Print the UDP sent data in hexadecimal format
        print_hex(data + HEADER_SIZE, padded_data_length);

        ssize_t bytesSent = sendto(udp_socket, data + HEADER_SIZE, padded_data_length, 0, (struct sockaddr*)&server_address, sizeof(server_address));
        if (bytesSent < 0) {
            printf("Failed to send UDP packet.n");
            pthread_exit(NULL);
        }
    }

    free(data);
    close(udp_socket);
    pthread_exit(NULL);
}

inux c 建立一个256M大小的一位数组环形队列,建立两个线程一个用于读取指定文件内容存入环形队列,一个用UDP每次读取环形队列中8192字节发送到指定地址,不足8192仅发送剩余数据。读取数据时最大以1024字节为一包数据读取,读取时每包数据自定义16字节的数据包头组委有效数据存入环形队列,如果环形队列数据满了,等待UDP发送数据有空闲空间读取线程再继续读。环形队列读写、初始化环形缓冲区、销毁环形缓冲区分别写成独立函数,UDP将在fread读取读取的数据发送完成后结束所有线程。分别用16进制printf写入缓冲区的数据和udp发送的数据

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>

#define BUFFER_SIZE 256 * 1024 * 1024  // 256M字节大小的环形队列
#define PACKET_SIZE 8192
#define MAX_PACKET_SIZE 1024
#define HEADER_SIZE 16

typedef struct {
    char buffer[BUFFER_SIZE];
    int read_index;
    int write_index;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t full;
    pthread_cond_t empty;
} CircularBuffer;

typedef struct {
    CircularBuffer* buffer;
    FILE* file;
} ReaderArgs;

typedef struct {
    CircularBuffer* buffer;
    struct sockaddr_in* addr;
} UdpArgs;

void initCircularBuffer(CircularBuffer* buffer) {
    buffer->read_index = 0;
    buffer->write_index = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->full, NULL);
    pthread_cond_init(&buffer->empty, NULL);
}

void destroyCircularBuffer(CircularBuffer* buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->full);
    pthread_cond_destroy(&buffer->empty);
}

void writeToCircularBuffer(CircularBuffer* buffer, const char* data, int size) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == BUFFER_SIZE) {
        pthread_cond_wait(&buffer->empty, &buffer->lock);
    }

    int remainingBytes = size;
    while (remainingBytes > 0) {
        int writeSize = (remainingBytes > BUFFER_SIZE - buffer->count) ? BUFFER_SIZE - buffer->count : remainingBytes;
        memcpy(buffer->buffer + buffer->write_index, data + size - remainingBytes, writeSize);
        buffer->write_index = (buffer->write_index + writeSize) % BUFFER_SIZE;
        buffer->count += writeSize;
        remainingBytes -= writeSize;
    }

    pthread_cond_signal(&buffer->full);
    pthread_mutex_unlock(&buffer->lock);
}

void readFromCircularBuffer(CircularBuffer* buffer, char* dest, int size) {
    pthread_mutex_lock(&buffer->lock);
    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->full, &buffer->lock);
    }

    int remainingBytes = size;
    while (remainingBytes > 0 && buffer->count > 0) {
        int readSize = (remainingBytes > buffer->count) ? buffer->count : remainingBytes;
        memcpy(dest + size - remainingBytes, buffer->buffer + buffer->read_index, readSize);
        buffer->read_index = (buffer->read_index + readSize) % BUFFER_SIZE;
        buffer->count -= readSize;
        remainingBytes -= readSize;
    }

    pthread_cond_signal(&buffer->empty);
    pthread_mutex_unlock(&buffer->lock);
}

void* readerThread(void* arg) {
    ReaderArgs* args = (ReaderArgs*)arg;
    CircularBuffer* buffer = args->buffer;
    FILE* file = args->file;
    char packet[MAX_PACKET_SIZE];
    int bytesRead;

    while ((bytesRead = fread(packet + HEADER_SIZE, sizeof(char), MAX_PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        // Write header
        memcpy(packet, &bytesRead, sizeof(int));

        // Write to circular buffer
        writeToCircularBuffer(buffer, packet, bytesRead + HEADER_SIZE);
    }

    fclose(file);
    pthread_exit(NULL);
}

void* udpThread(void* arg) {
    UdpArgs* args = (UdpArgs*)arg;
    CircularBuffer* buffer = args->buffer;
    struct sockaddr_in* addr = args->addr;
    int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
    char packet[PACKET_SIZE];
    int bytesSent, dataLength;

    while (1) {
        // Read from the circular buffer
        readFromCircularBuffer(buffer, packet, PACKET_SIZE);

        // Extract data length from header
        memcpy(&dataLength, packet, sizeof(int));

        // Send UDP packet
        bytesSent = sendto(sockfd, packet + HEADER_SIZE, dataLength, 0, (struct sockaddr*)addr, sizeof(struct sockaddr));
        if (bytesSent < 0) {
            perror("Failed to send UDP packet");
            break;
        }
    }

    close(sockfd);
    pthread_exit(NULL);
}

void printBufferAsHex(const char* buffer, int size) {
    int i;
    for (i = 0; i < size; i++) {
        printf("%02X ", (unsigned char)buffer[i]);
        if ((i + 1) % 16 == 0)
            printf("n");
    }
    printf("n");
}

int main() {
    CircularBuffer buffer;
    initCircularBuffer(&buffer);

    FILE* file = fopen("path/to/file", "rb");
    if (!file) {
        perror("Failed to open file");
        return EXIT_FAILURE;
    }

    pthread_t readerThreadID, udpThreadID;

    ReaderArgs readerArgs = { .buffer = &buffer, .file = file };
    if (pthread_create(&readerThreadID, NULL, readerThread, (void*)&readerArgs) != 0) {
        perror("Failed to create reader thread");
        return EXIT_FAILURE;
    }

    struct sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(UDP_PORT);  // 替换为所需的UDP端口
    inet_aton("127.0.0.1", &addr.sin_addr);  // 替换为所需的目标IP地址

    UdpArgs udpArgs = { .buffer = &buffer, .addr = &addr };
    if (pthread_create(&udpThreadID, NULL, udpThread, (void*)&udpArgs) != 0) {
        perror("Failed to create UDP thread");
        return EXIT_FAILURE;
    }

    pthread_join(readerThreadID, NULL);
    pthread_cond_signal(&buffer.full);  // 通知UDP线程结束
    pthread_join(udpThreadID, NULL);

    destroyCircularBuffer(&buffer);

    // 打印缓冲区内容(以16进制形式)
    printf("缓冲区内容:n");
    printBufferAsHex(buffer.buffer, BUFFER_SIZE);

    return EXIT_SUCCESS;
}

二维数字队列

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char **buffer;
    int *length;
    int size;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

typedef struct {
    CircularBuffer *buffer;
    const char *filename;
} ReaderParams;

typedef struct {
    CircularBuffer *buffer;
    const char *dest_ip;
    int dest_port;
} SenderParams;

void circularBufferInit(CircularBuffer *buffer, int size) {
    buffer->buffer = (char**)malloc(sizeof(char*) * size);
    buffer->length = (int*)malloc(sizeof(int) * size);
    for (int i = 0; i < size; i++) {
        buffer->buffer[i] = (char*)malloc(PACKET_SIZE);
        buffer->length[i] = 0;
    }
    buffer->size = size;
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
}

void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) {
    pthread_mutex_lock(&buffer->lock);

    while (buffer->count == buffer->size) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }

    char *packet = buffer->buffer[buffer->write_idx];
    int padding_size = PACKET_SIZE - HEADER_SIZE - size;

    // 添加包头
    memcpy(packet, "Custom Header", HEADER_SIZE);

    // 添加数据
    memcpy(packet + HEADER_SIZE, data, size);

    // 补充0
    memset(packet + HEADER_SIZE + size, 0, padding_size);

    // 记录数据包长度
    buffer->length[buffer->write_idx] = size;

    buffer->write_idx = (buffer->write_idx + 1) % buffer->size;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);
    pthread_mutex_unlock(&buffer->lock);
}

void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) {
    pthread_mutex_lock(&buffer->lock);

    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    char *packet = buffer->buffer[buffer->read_idx];
    int packet_size = buffer->length[buffer->read_idx];
    int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size;

    // 读取数据
    memcpy(data, packet + HEADER_SIZE, packet_size);

    // 填补的0不需要被读取,直接跳过

    buffer->read_idx = (buffer->read_idx + 1) % buffer->size;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);
}

void circularBufferDestroy(CircularBuffer *buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);

    for (int i = 0; i < buffer->size; i++) {
        free(buffer->buffer[i]);
    }
    free(buffer->buffer);
    free(buffer->length);

    buffer->size = 0;
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
}

void *readerThread(void *params) {
    ReaderParams *readerParams = (ReaderParams*)params;

    FILE *file = fopen(readerParams->filename, "rb");
    if (file == NULL) {
        perror("Failed to open file");
        return NULL;
    }

    char data[PACKET_SIZE];
    size_t bytesRead = 0;

    while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circularBufferWrite(readerParams->buffer, data, bytesRead);
        usleep(1000);
    }

    fclose(file);

    return NULL;
}

int createUdpSocket(const char *dest_ip, int dest_port) {
    int sock = socket(AF_INET, SOCK_DGRAM, 0);

    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(dest_ip);
    server_addr.sin_port = htons(dest_port);

    if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
        perror("Failed to connect to UDP server");
        close(sock);
        return -1;
    }

    return sock;
}

void *senderThread(void *params) {
    SenderParams *senderParams = (SenderParams*)params;

    int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port);

    char data[PACKET_SIZE];
    while (1) {
        circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE);
        send(sock, data, PACKET_SIZE - HEADER_SIZE, 0);
        usleep(1000);
    }

    close(sock);

    return NULL;
}

int main() {
    CircularBuffer buffer;
    circularBufferInit(&buffer, BUFFER_SIZE);

    ReaderParams readerParams;
    readerParams.buffer = &buffer;
    readerParams.filename = "input.txt";

    SenderParams senderParams;
    senderParams.buffer = &buffer;
    senderParams.dest_ip = "127.0.0.1";  // 目标地址
    senderParams.dest_port = 12345;  // 目标端口

    pthread_t readerThreadId;
    pthread_t senderThreadId;

    pthread_create(&readerThreadId, NULL, readerThread, &readerParams);
    pthread_create(&senderThreadId, NULL, senderThread, &senderParams);

    pthread_join(readerThreadId, NULL);
    pthread_join(senderThreadId, NULL);

    circularBufferDestroy(&buffer);

    return 0;
}

备注

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>

#define BUFFER_SIZE 256 * 1024
#define PACKET_SIZE 8192
#define HEADER_SIZE 16

typedef struct {
    char **buffer;
    int *length;
    int size;
    int read_idx;
    int write_idx;
    int count;
    pthread_mutex_t lock;
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
} CircularBuffer;

typedef struct {
    CircularBuffer *buffer;
    const char *filename;
} ReaderParams;

typedef struct {
    CircularBuffer *buffer;
    const char *dest_ip;
    int dest_port;
} SenderParams;

void circularBufferInit(CircularBuffer *buffer, int size) {
    buffer->buffer = (char**)malloc(sizeof(char*) * size);
    buffer->length = (int*)malloc(sizeof(int) * size);
    for (int i = 0; i < size; i++) {
        buffer->buffer[i] = (char*)malloc(PACKET_SIZE);
        buffer->length[i] = 0;
    }
    buffer->size = size;
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
    pthread_mutex_init(&buffer->lock, NULL);
    pthread_cond_init(&buffer->not_full, NULL);
    pthread_cond_init(&buffer->not_empty, NULL);
}

void circularBufferWrite(CircularBuffer *writbuffer, const void *data, size_t size) {
    pthread_mutex_lock(&writbuffer->lock);

    while (writbuffer->count == writbuffer->size) {
        pthread_cond_wait(&writbuffer->not_full, &writbuffer->lock);
    }

    char *packet = writbuffer->buffer[writbuffer->write_idx];
    int padding_size = PACKET_SIZE - HEADER_SIZE - size;

    // 添加包头
    memcpy(packet, "Custom Header", HEADER_SIZE);

    // 添加数据
    memcpy(packet + HEADER_SIZE, data, size);

    // 补充0
    memset(packet + HEADER_SIZE + size, 0, padding_size);

    // 记录数据包长度
    writbuffer->length[writbuffer->write_idx] = size;

    writbuffer->write_idx = (writbuffer->write_idx + 1) % writbuffer->size;
    writbuffer->count++;

    pthread_cond_signal(&writbuffer->not_empty);
    pthread_mutex_unlock(&writbuffer->lock);
}

void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) {
    pthread_mutex_lock(&buffer->lock);

    while (buffer->count == buffer->size) {
        pthread_cond_wait(&buffer->not_full, &buffer->lock);
    }
    /*
	用于判断环形队列是否已满的逻辑,并在队列已满时进行阻塞等待的操作。
buffer->count表示当前环形队列中已经存储的数据包数量。
buffer->size表示环形队列的最大容量,即可以存储的最大数据包数量。
在以上代码中,通过条件判断 buffer->count == buffer->size 来判断环形队列是否已满。如果队列已满,即存储的数据包数量等于最大容量,进入 while 循环。
	*/
    char *packet = buffer->buffer[buffer->write_idx];/*将环形缓冲区中写入位置(buffer->write_idx)索引所指向的字符指针(buffer->buffer[buffer->write_idx])赋值给 packet。

这行代码假设 buffer->buffer 是一个 char** 类型的指针,指向存储数据的字符指针数组。通过访问 buffer->buffer[buffer->write_idx],可以获得当前待写入的位置在环形缓冲区中指向的字符数组的指针。*/
    int padding_size = PACKET_SIZE - HEADER_SIZE - size;/*运算得出填充大小,大于零表示本包数据不足*/

    // 添加包头
    memcpy(packet, "Custom Header", HEADER_SIZE);

    // 添加数据
    memcpy(packet + HEADER_SIZE, data, size);

    // 补充0
    memset(packet + HEADER_SIZE + size, 0, padding_size);

    // 记录数据包长度
    buffer->length[buffer->write_idx] = size;

    buffer->write_idx = (buffer->write_idx + 1) % buffer->size;
    buffer->count++;

    pthread_cond_signal(&buffer->not_empty);//发送信号通知其他等待中的线程,表示缓冲区不再为空,这可能是用于唤醒一个等待从缓冲区中读取数据的消费者线程。
    pthread_mutex_unlock(&buffer->lock);//解锁缓冲区的互斥锁,表示写入操作已完成,
}

void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) {
    pthread_mutex_lock(&buffer->lock);

    while (buffer->count == 0) {
        pthread_cond_wait(&buffer->not_empty, &buffer->lock);
    }

    char *packet = buffer->buffer[buffer->read_idx];
    int packet_size = buffer->length[buffer->read_idx];
    int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size;

    // 读取数据
    memcpy(data, packet + HEADER_SIZE, packet_size);

    // 填补的0不需要被读取,直接跳过

    buffer->read_idx = (buffer->read_idx + 1) % buffer->size;
    buffer->count--;

    pthread_cond_signal(&buffer->not_full);
    pthread_mutex_unlock(&buffer->lock);
}

void circularBufferDestroy(CircularBuffer *buffer) {
    pthread_mutex_destroy(&buffer->lock);
    pthread_cond_destroy(&buffer->not_full);
    pthread_cond_destroy(&buffer->not_empty);

    for (int i = 0; i < buffer->size; i++) {
        free(buffer->buffer[i]);
    }
    free(buffer->buffer);
    free(buffer->length);

    buffer->size = 0;
    buffer->read_idx = 0;
    buffer->write_idx = 0;
    buffer->count = 0;
}

void *readerThread(void *params) {
    ReaderParams *readerParams = (ReaderParams*)params;

    FILE *file = fopen(readerParams->filename, "rb");
    if (file == NULL) {
        perror("Failed to open file");
        return NULL;
    }

    char data[PACKET_SIZE];
    size_t bytesRead = 0;

    while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
        circularBufferWrite(readerParams->buffer, data, bytesRead);
        usleep(1000);
    }

    fclose(file);

    return NULL;
}

int createUdpSocket(const char *dest_ip, int dest_port) {
    int sock = socket(AF_INET, SOCK_DGRAM, 0);

    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr(dest_ip);
    server_addr.sin_port = htons(dest_port);

    if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
        perror("Failed to connect to UDP server");
        close(sock);
        return -1;
    }

    return sock;
}

void *senderThread(void *params) {
    SenderParams *senderParams = (SenderParams*)params;

    int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port);

    char data[PACKET_SIZE];
    while (1) {
        circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE);
        send(sock, data, PACKET_SIZE - HEADER_SIZE, 0);
        usleep(1000);
    }

    close(sock);

    return NULL;
}

int main() {
    CircularBuffer buffer;
    circularBufferInit(&buffer, BUFFER_SIZE);

    ReaderParams readerParams;
    readerParams.buffer = &buffer;
    readerParams.filename = "input.txt";

    SenderParams senderParams;
    senderParams.buffer = &buffer;
    senderParams.dest_ip = "127.0.0.1";  // 目标地址
    senderParams.dest_port = 12345;  // 目标端口

    pthread_t readerThreadId;
    pthread_t senderThreadId;

    pthread_create(&readerThreadId, NULL, readerThread, &readerParams);
    pthread_create(&senderThreadId, NULL, senderThread, &senderParams);

    pthread_join(readerThreadId, NULL);
    pthread_join(senderThreadId, NULL);

    circularBufferDestroy(&buffer);

    return 0;
}