c语言环形队列
一位数组队列
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16
typedef struct {
char* buffer;
int read_idx;
int write_idx;
int count;
pthread_mutex_t lock;
pthread_cond_t not_full;
pthread_cond_t not_empty;
} CircularBuffer;
CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void* read_thread(void* arg);
void* send_thread(void* arg);
int main() {
CircularBuffer* buffer = circular_buffer_init();
pthread_t thread1, thread2;
pthread_create(&thread1, NULL, read_thread, buffer);
pthread_create(&thread2, NULL, send_thread, buffer);
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
circular_buffer_destroy(buffer);
return 0;
}
CircularBuffer* circular_buffer_init() {
CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
buffer->buffer = (char*)malloc(BUFFER_SIZE);
buffer->read_idx = 0;
buffer->write_idx = 0;
buffer->count = 0;
pthread_mutex_init(&buffer->lock, NULL);
pthread_cond_init(&buffer->not_full, NULL);
pthread_cond_init(&buffer->not_empty, NULL);
return buffer;
}
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == BUFFER_SIZE) {
pthread_cond_wait(&buffer->not_full, &buffer->lock);
}
int remaining = PACKET_SIZE - HEADER_SIZE - length;
if (remaining > 0) {
memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
}
memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
buffer->count++;
pthread_cond_signal(&buffer->not_empty);
pthread_mutex_unlock(&buffer->lock);
}
void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == 0) {
pthread_cond_wait(&buffer->not_empty, &buffer->lock);
}
memcpy(data, buffer->buffer + buffer->read_idx, length);
buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
buffer->count--;
pthread_cond_signal(&buffer->not_full);
pthread_mutex_unlock(&buffer->lock);
}
void circular_buffer_destroy(CircularBuffer* buffer) {
pthread_mutex_destroy(&buffer->lock);
pthread_cond_destroy(&buffer->not_full);
pthread_cond_destroy(&buffer->not_empty);
free(buffer->buffer);
free(buffer);
}
void* read_thread(void* arg) {
CircularBuffer* buffer = (CircularBuffer*)arg;
FILE* file = fopen("input.txt", "r");
if (file == NULL) {
printf("Failed to open file.n");
pthread_exit(NULL);
}
char* data = (char*)malloc(PACKET_SIZE);
memset(data, 0, PACKET_SIZE);
char header[HEADER_SIZE] = "DATA HEADER";
memcpy(data, header, HEADER_SIZE);
ssize_t bytesRead;
while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
circular_buffer_write(buffer, data, PACKET_SIZE);
}
free(data);
fclose(file);
pthread_exit(NULL);
}
void* send_thread(void* arg) {
CircularBuffer* buffer = (CircularBuffer*)arg;
int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
if (udp_socket < 0) {
printf("Failed to create UDP socket.n");
pthread_exit(NULL);
}
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_port = htons(1234);
server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
char* data = (char*)malloc(PACKET_SIZE);
while (1) {
circular_buffer_read(buffer, data, PACKET_SIZE);
ssize_t bytesSent = sendto(udp_socket, data, PACKET_SIZE, 0, (struct sockaddr*)&server_address, sizeof(server_address));
if (bytesSent < 0) {
printf("Failed to send UDP packet.n");
pthread_exit(NULL);
}
}
free(data);
close(udp_socket);
pthread_exit(NULL);
}
全部打印
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16
typedef struct {
char* buffer;
int read_idx;
int write_idx;
int count;
pthread_mutex_t lock;
pthread_cond_t not_full;
pthread_cond_t not_empty;
} CircularBuffer;
CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void print_hex(const char* data, int length);
void* read_thread(void* arg);
void* send_thread(void* arg);
int main() {
CircularBuffer* buffer = circular_buffer_init();
pthread_t thread1, thread2;
pthread_create(&thread1, NULL, read_thread, buffer);
pthread_create(&thread2, NULL, send_thread, buffer);
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
circular_buffer_destroy(buffer);
return 0;
}
CircularBuffer* circular_buffer_init() {
CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
buffer->buffer = (char*)malloc(BUFFER_SIZE);
buffer->read_idx = 0;
buffer->write_idx = 0;
buffer->count = 0;
pthread_mutex_init(&buffer->lock, NULL);
pthread_cond_init(&buffer->not_full, NULL);
pthread_cond_init(&buffer->not_empty, NULL);
return buffer;
}
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == BUFFER_SIZE) {
pthread_cond_wait(&buffer->not_full, &buffer->lock);
}
int remaining = PACKET_SIZE - HEADER_SIZE - length;
if (remaining > 0) {
memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
}
memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
buffer->count++;
pthread_cond_signal(&buffer->not_empty);
pthread_mutex_unlock(&buffer->lock);
// Print the written data in hexadecimal format
print_hex(data, length);
}
void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == 0) {
pthread_cond_wait(&buffer->not_empty, &buffer->lock);
}
memcpy(data, buffer->buffer + buffer->read_idx, length);
buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
buffer->count--;
pthread_cond_signal(&buffer->not_full);
pthread_mutex_unlock(&buffer->lock);
// Print the read data in hexadecimal format
print_hex(data, length);
}
void circular_buffer_destroy(CircularBuffer* buffer) {
pthread_mutex_destroy(&buffer->lock);
pthread_cond_destroy(&buffer->not_full);
pthread_cond_destroy(&buffer->not_empty);
free(buffer->buffer);
free(buffer);
}
void print_hex(const char* data, int length) {
printf("Data: ");
for (int i = 0; i < length; i++) {
printf("%02X ", (unsigned char)data[i]);
}
printf("n");
}
void* read_thread(void* arg) {
CircularBuffer* buffer = (CircularBuffer*)arg;
FILE* file = fopen("./123.txt", "r");
if (file == NULL) {
printf("Failed to open file.n");
pthread_exit(NULL);
}
char* data = (char*)malloc(PACKET_SIZE);
memset(data, 0, PACKET_SIZE);
char header[HEADER_SIZE] = "DATA HEADER";
memcpy(data, header, HEADER_SIZE);
ssize_t bytesRead;
while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
circular_buffer_write(buffer, data, PACKET_SIZE);
}
free(data);
fclose(file);
pthread_exit(NULL);
}
void* send_thread(void* arg) {
CircularBuffer* buffer = (CircularBuffer*)arg;
int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
if (udp_socket < 0) {
printf("Failed to create UDP socket.n");
pthread_exit(NULL);
}
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_port = htons(1234);
server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
char* data = (char*)malloc(PACKET_SIZE);
while (1) {
circular_buffer_read(buffer, data, PACKET_SIZE);
ssize_t bytesSent = sendto(udp_socket, data, PACKET_SIZE, 0, (struct sockaddr*)&server_address, sizeof(server_address));
if (bytesSent < 0) {
printf("Failed to send UDP packet.n");
pthread_exit(NULL);
}
// Print the UDP sent data in hexadecimal format
print_hex(data, PACKET_SIZE);
}
free(data);
close(udp_socket);
pthread_exit(NULL);
}
只打印发送
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define BUFFER_SIZE 256 * 1024 * 1024 // 256MB
#define PACKET_SIZE 8192
#define HEADER_SIZE 16
typedef struct {
char* buffer;
int read_idx;
int write_idx;
int count;
pthread_mutex_t lock;
pthread_cond_t not_full;
pthread_cond_t not_empty;
} CircularBuffer;
CircularBuffer* circular_buffer_init();
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length);
void circular_buffer_read(CircularBuffer* buffer, char* data, int length);
void circular_buffer_destroy(CircularBuffer* buffer);
void print_hex(const char* data, int length);
void* read_thread(void* arg);
void* send_thread(void* arg);
int main() {
CircularBuffer* buffer = circular_buffer_init();
pthread_t thread1, thread2;
pthread_create(&thread1, NULL, read_thread, buffer);
pthread_create(&thread2, NULL, send_thread, buffer);
pthread_join(thread1, NULL);
pthread_join(thread2, NULL);
circular_buffer_destroy(buffer);
return 0;
}
CircularBuffer* circular_buffer_init() {
CircularBuffer* buffer = (CircularBuffer*)malloc(sizeof(CircularBuffer));
buffer->buffer = (char*)malloc(BUFFER_SIZE);
buffer->read_idx = 0;
buffer->write_idx = 0;
buffer->count = 0;
pthread_mutex_init(&buffer->lock, NULL);
pthread_cond_init(&buffer->not_full, NULL);
pthread_cond_init(&buffer->not_empty, NULL);
return buffer;
}
void circular_buffer_write(CircularBuffer* buffer, const char* data, int length) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == BUFFER_SIZE) {
pthread_cond_wait(&buffer->not_full, &buffer->lock);
}
int remaining = PACKET_SIZE - HEADER_SIZE - length;
if (remaining > 0) {
memset(buffer->buffer + buffer->write_idx + HEADER_SIZE + length, 0, remaining);
}
memcpy(buffer->buffer + buffer->write_idx, data, length + HEADER_SIZE);
buffer->write_idx = (buffer->write_idx + PACKET_SIZE) % BUFFER_SIZE;
buffer->count++;
pthread_cond_signal(&buffer->not_empty);
pthread_mutex_unlock(&buffer->lock);
}
void circular_buffer_read(CircularBuffer* buffer, char* data, int length) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == 0) {
pthread_cond_wait(&buffer->not_empty, &buffer->lock);
}
memcpy(data, buffer->buffer + buffer->read_idx, length);
buffer->read_idx = (buffer->read_idx + PACKET_SIZE) % BUFFER_SIZE;
buffer->count--;
pthread_cond_signal(&buffer->not_full);
pthread_mutex_unlock(&buffer->lock);
}
void circular_buffer_destroy(CircularBuffer* buffer) {
pthread_mutex_destroy(&buffer->lock);
pthread_cond_destroy(&buffer->not_full);
pthread_cond_destroy(&buffer->not_empty);
free(buffer->buffer);
free(buffer);
}
void print_hex(const char* data, int length) {
printf("Data: ");
for (int i = 0; i < length; i++) {
printf("%02X ", (unsigned char)data[i]);
}
printf("n");
}
void* read_thread(void* arg) {
CircularBuffer* buffer = (CircularBuffer*)arg;
FILE* file = fopen("input.txt", "r");
if (file == NULL) {
printf("Failed to open file.n");
pthread_exit(NULL);
}
char* data = (char*)malloc(PACKET_SIZE);
memset(data, 0, PACKET_SIZE);
char header[HEADER_SIZE] = "DATA HEADER";
memcpy(data, header, HEADER_SIZE);
ssize_t bytesRead;
while ((bytesRead = fread(data + HEADER_SIZE, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
circular_buffer_write(buffer, data, HEADER_SIZE + bytesRead);
}
free(data);
fclose(file);
pthread_exit(NULL);
}
void* send_thread(void* arg) {
CircularBuffer* buffer = (CircularBuffer*)arg;
int udp_socket = socket(AF_INET, SOCK_DGRAM, 0);
if (udp_socket < 0) {
printf("Failed to create UDP socket.n");
pthread_exit(NULL);
}
struct sockaddr_in server_address;
server_address.sin_family = AF_INET;
server_address.sin_port = htons(1234);
server_address.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
char* data = (char*)malloc(PACKET_SIZE);
while (1) {
circular_buffer_read(buffer, data, PACKET_SIZE);
int valid_data_length = PACKET_SIZE - HEADER_SIZE;
int padded_data_length = valid_data_length;
for (int i = valid_data_length - 1; i >= 0; i--) {
if (data[HEADER_SIZE + i] != 0) {
break;
}
padded_data_length--;
}
// Print the UDP sent data in hexadecimal format
print_hex(data + HEADER_SIZE, padded_data_length);
ssize_t bytesSent = sendto(udp_socket, data + HEADER_SIZE, padded_data_length, 0, (struct sockaddr*)&server_address, sizeof(server_address));
if (bytesSent < 0) {
printf("Failed to send UDP packet.n");
pthread_exit(NULL);
}
}
free(data);
close(udp_socket);
pthread_exit(NULL);
}
inux c 建立一个256M大小的一位数组环形队列,建立两个线程一个用于读取指定文件内容存入环形队列,一个用UDP每次读取环形队列中8192字节发送到指定地址,不足8192仅发送剩余数据。读取数据时最大以1024字节为一包数据读取,读取时每包数据自定义16字节的数据包头组委有效数据存入环形队列,如果环形队列数据满了,等待UDP发送数据有空闲空间读取线程再继续读。环形队列读写、初始化环形缓冲区、销毁环形缓冲区分别写成独立函数,UDP将在fread读取读取的数据发送完成后结束所有线程。分别用16进制printf写入缓冲区的数据和udp发送的数据
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#define BUFFER_SIZE 256 * 1024 * 1024 // 256M字节大小的环形队列
#define PACKET_SIZE 8192
#define MAX_PACKET_SIZE 1024
#define HEADER_SIZE 16
typedef struct {
char buffer[BUFFER_SIZE];
int read_index;
int write_index;
int count;
pthread_mutex_t lock;
pthread_cond_t full;
pthread_cond_t empty;
} CircularBuffer;
typedef struct {
CircularBuffer* buffer;
FILE* file;
} ReaderArgs;
typedef struct {
CircularBuffer* buffer;
struct sockaddr_in* addr;
} UdpArgs;
void initCircularBuffer(CircularBuffer* buffer) {
buffer->read_index = 0;
buffer->write_index = 0;
buffer->count = 0;
pthread_mutex_init(&buffer->lock, NULL);
pthread_cond_init(&buffer->full, NULL);
pthread_cond_init(&buffer->empty, NULL);
}
void destroyCircularBuffer(CircularBuffer* buffer) {
pthread_mutex_destroy(&buffer->lock);
pthread_cond_destroy(&buffer->full);
pthread_cond_destroy(&buffer->empty);
}
void writeToCircularBuffer(CircularBuffer* buffer, const char* data, int size) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == BUFFER_SIZE) {
pthread_cond_wait(&buffer->empty, &buffer->lock);
}
int remainingBytes = size;
while (remainingBytes > 0) {
int writeSize = (remainingBytes > BUFFER_SIZE - buffer->count) ? BUFFER_SIZE - buffer->count : remainingBytes;
memcpy(buffer->buffer + buffer->write_index, data + size - remainingBytes, writeSize);
buffer->write_index = (buffer->write_index + writeSize) % BUFFER_SIZE;
buffer->count += writeSize;
remainingBytes -= writeSize;
}
pthread_cond_signal(&buffer->full);
pthread_mutex_unlock(&buffer->lock);
}
void readFromCircularBuffer(CircularBuffer* buffer, char* dest, int size) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == 0) {
pthread_cond_wait(&buffer->full, &buffer->lock);
}
int remainingBytes = size;
while (remainingBytes > 0 && buffer->count > 0) {
int readSize = (remainingBytes > buffer->count) ? buffer->count : remainingBytes;
memcpy(dest + size - remainingBytes, buffer->buffer + buffer->read_index, readSize);
buffer->read_index = (buffer->read_index + readSize) % BUFFER_SIZE;
buffer->count -= readSize;
remainingBytes -= readSize;
}
pthread_cond_signal(&buffer->empty);
pthread_mutex_unlock(&buffer->lock);
}
void* readerThread(void* arg) {
ReaderArgs* args = (ReaderArgs*)arg;
CircularBuffer* buffer = args->buffer;
FILE* file = args->file;
char packet[MAX_PACKET_SIZE];
int bytesRead;
while ((bytesRead = fread(packet + HEADER_SIZE, sizeof(char), MAX_PACKET_SIZE - HEADER_SIZE, file)) > 0) {
// Write header
memcpy(packet, &bytesRead, sizeof(int));
// Write to circular buffer
writeToCircularBuffer(buffer, packet, bytesRead + HEADER_SIZE);
}
fclose(file);
pthread_exit(NULL);
}
void* udpThread(void* arg) {
UdpArgs* args = (UdpArgs*)arg;
CircularBuffer* buffer = args->buffer;
struct sockaddr_in* addr = args->addr;
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
char packet[PACKET_SIZE];
int bytesSent, dataLength;
while (1) {
// Read from the circular buffer
readFromCircularBuffer(buffer, packet, PACKET_SIZE);
// Extract data length from header
memcpy(&dataLength, packet, sizeof(int));
// Send UDP packet
bytesSent = sendto(sockfd, packet + HEADER_SIZE, dataLength, 0, (struct sockaddr*)addr, sizeof(struct sockaddr));
if (bytesSent < 0) {
perror("Failed to send UDP packet");
break;
}
}
close(sockfd);
pthread_exit(NULL);
}
void printBufferAsHex(const char* buffer, int size) {
int i;
for (i = 0; i < size; i++) {
printf("%02X ", (unsigned char)buffer[i]);
if ((i + 1) % 16 == 0)
printf("n");
}
printf("n");
}
int main() {
CircularBuffer buffer;
initCircularBuffer(&buffer);
FILE* file = fopen("path/to/file", "rb");
if (!file) {
perror("Failed to open file");
return EXIT_FAILURE;
}
pthread_t readerThreadID, udpThreadID;
ReaderArgs readerArgs = { .buffer = &buffer, .file = file };
if (pthread_create(&readerThreadID, NULL, readerThread, (void*)&readerArgs) != 0) {
perror("Failed to create reader thread");
return EXIT_FAILURE;
}
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(UDP_PORT); // 替换为所需的UDP端口
inet_aton("127.0.0.1", &addr.sin_addr); // 替换为所需的目标IP地址
UdpArgs udpArgs = { .buffer = &buffer, .addr = &addr };
if (pthread_create(&udpThreadID, NULL, udpThread, (void*)&udpArgs) != 0) {
perror("Failed to create UDP thread");
return EXIT_FAILURE;
}
pthread_join(readerThreadID, NULL);
pthread_cond_signal(&buffer.full); // 通知UDP线程结束
pthread_join(udpThreadID, NULL);
destroyCircularBuffer(&buffer);
// 打印缓冲区内容(以16进制形式)
printf("缓冲区内容:n");
printBufferAsHex(buffer.buffer, BUFFER_SIZE);
return EXIT_SUCCESS;
}
二维数字队列
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define BUFFER_SIZE 256 * 1024
#define PACKET_SIZE 8192
#define HEADER_SIZE 16
typedef struct {
char **buffer;
int *length;
int size;
int read_idx;
int write_idx;
int count;
pthread_mutex_t lock;
pthread_cond_t not_full;
pthread_cond_t not_empty;
} CircularBuffer;
typedef struct {
CircularBuffer *buffer;
const char *filename;
} ReaderParams;
typedef struct {
CircularBuffer *buffer;
const char *dest_ip;
int dest_port;
} SenderParams;
void circularBufferInit(CircularBuffer *buffer, int size) {
buffer->buffer = (char**)malloc(sizeof(char*) * size);
buffer->length = (int*)malloc(sizeof(int) * size);
for (int i = 0; i < size; i++) {
buffer->buffer[i] = (char*)malloc(PACKET_SIZE);
buffer->length[i] = 0;
}
buffer->size = size;
buffer->read_idx = 0;
buffer->write_idx = 0;
buffer->count = 0;
pthread_mutex_init(&buffer->lock, NULL);
pthread_cond_init(&buffer->not_full, NULL);
pthread_cond_init(&buffer->not_empty, NULL);
}
void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == buffer->size) {
pthread_cond_wait(&buffer->not_full, &buffer->lock);
}
char *packet = buffer->buffer[buffer->write_idx];
int padding_size = PACKET_SIZE - HEADER_SIZE - size;
// 添加包头
memcpy(packet, "Custom Header", HEADER_SIZE);
// 添加数据
memcpy(packet + HEADER_SIZE, data, size);
// 补充0
memset(packet + HEADER_SIZE + size, 0, padding_size);
// 记录数据包长度
buffer->length[buffer->write_idx] = size;
buffer->write_idx = (buffer->write_idx + 1) % buffer->size;
buffer->count++;
pthread_cond_signal(&buffer->not_empty);
pthread_mutex_unlock(&buffer->lock);
}
void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == 0) {
pthread_cond_wait(&buffer->not_empty, &buffer->lock);
}
char *packet = buffer->buffer[buffer->read_idx];
int packet_size = buffer->length[buffer->read_idx];
int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size;
// 读取数据
memcpy(data, packet + HEADER_SIZE, packet_size);
// 填补的0不需要被读取,直接跳过
buffer->read_idx = (buffer->read_idx + 1) % buffer->size;
buffer->count--;
pthread_cond_signal(&buffer->not_full);
pthread_mutex_unlock(&buffer->lock);
}
void circularBufferDestroy(CircularBuffer *buffer) {
pthread_mutex_destroy(&buffer->lock);
pthread_cond_destroy(&buffer->not_full);
pthread_cond_destroy(&buffer->not_empty);
for (int i = 0; i < buffer->size; i++) {
free(buffer->buffer[i]);
}
free(buffer->buffer);
free(buffer->length);
buffer->size = 0;
buffer->read_idx = 0;
buffer->write_idx = 0;
buffer->count = 0;
}
void *readerThread(void *params) {
ReaderParams *readerParams = (ReaderParams*)params;
FILE *file = fopen(readerParams->filename, "rb");
if (file == NULL) {
perror("Failed to open file");
return NULL;
}
char data[PACKET_SIZE];
size_t bytesRead = 0;
while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
circularBufferWrite(readerParams->buffer, data, bytesRead);
usleep(1000);
}
fclose(file);
return NULL;
}
int createUdpSocket(const char *dest_ip, int dest_port) {
int sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(dest_ip);
server_addr.sin_port = htons(dest_port);
if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Failed to connect to UDP server");
close(sock);
return -1;
}
return sock;
}
void *senderThread(void *params) {
SenderParams *senderParams = (SenderParams*)params;
int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port);
char data[PACKET_SIZE];
while (1) {
circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE);
send(sock, data, PACKET_SIZE - HEADER_SIZE, 0);
usleep(1000);
}
close(sock);
return NULL;
}
int main() {
CircularBuffer buffer;
circularBufferInit(&buffer, BUFFER_SIZE);
ReaderParams readerParams;
readerParams.buffer = &buffer;
readerParams.filename = "input.txt";
SenderParams senderParams;
senderParams.buffer = &buffer;
senderParams.dest_ip = "127.0.0.1"; // 目标地址
senderParams.dest_port = 12345; // 目标端口
pthread_t readerThreadId;
pthread_t senderThreadId;
pthread_create(&readerThreadId, NULL, readerThread, &readerParams);
pthread_create(&senderThreadId, NULL, senderThread, &senderParams);
pthread_join(readerThreadId, NULL);
pthread_join(senderThreadId, NULL);
circularBufferDestroy(&buffer);
return 0;
}
备注
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#define BUFFER_SIZE 256 * 1024
#define PACKET_SIZE 8192
#define HEADER_SIZE 16
typedef struct {
char **buffer;
int *length;
int size;
int read_idx;
int write_idx;
int count;
pthread_mutex_t lock;
pthread_cond_t not_full;
pthread_cond_t not_empty;
} CircularBuffer;
typedef struct {
CircularBuffer *buffer;
const char *filename;
} ReaderParams;
typedef struct {
CircularBuffer *buffer;
const char *dest_ip;
int dest_port;
} SenderParams;
void circularBufferInit(CircularBuffer *buffer, int size) {
buffer->buffer = (char**)malloc(sizeof(char*) * size);
buffer->length = (int*)malloc(sizeof(int) * size);
for (int i = 0; i < size; i++) {
buffer->buffer[i] = (char*)malloc(PACKET_SIZE);
buffer->length[i] = 0;
}
buffer->size = size;
buffer->read_idx = 0;
buffer->write_idx = 0;
buffer->count = 0;
pthread_mutex_init(&buffer->lock, NULL);
pthread_cond_init(&buffer->not_full, NULL);
pthread_cond_init(&buffer->not_empty, NULL);
}
void circularBufferWrite(CircularBuffer *writbuffer, const void *data, size_t size) {
pthread_mutex_lock(&writbuffer->lock);
while (writbuffer->count == writbuffer->size) {
pthread_cond_wait(&writbuffer->not_full, &writbuffer->lock);
}
char *packet = writbuffer->buffer[writbuffer->write_idx];
int padding_size = PACKET_SIZE - HEADER_SIZE - size;
// 添加包头
memcpy(packet, "Custom Header", HEADER_SIZE);
// 添加数据
memcpy(packet + HEADER_SIZE, data, size);
// 补充0
memset(packet + HEADER_SIZE + size, 0, padding_size);
// 记录数据包长度
writbuffer->length[writbuffer->write_idx] = size;
writbuffer->write_idx = (writbuffer->write_idx + 1) % writbuffer->size;
writbuffer->count++;
pthread_cond_signal(&writbuffer->not_empty);
pthread_mutex_unlock(&writbuffer->lock);
}
void circularBufferWrite(CircularBuffer *buffer, const void *data, size_t size) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == buffer->size) {
pthread_cond_wait(&buffer->not_full, &buffer->lock);
}
/*
用于判断环形队列是否已满的逻辑,并在队列已满时进行阻塞等待的操作。
buffer->count表示当前环形队列中已经存储的数据包数量。
buffer->size表示环形队列的最大容量,即可以存储的最大数据包数量。
在以上代码中,通过条件判断 buffer->count == buffer->size 来判断环形队列是否已满。如果队列已满,即存储的数据包数量等于最大容量,进入 while 循环。
*/
char *packet = buffer->buffer[buffer->write_idx];/*将环形缓冲区中写入位置(buffer->write_idx)索引所指向的字符指针(buffer->buffer[buffer->write_idx])赋值给 packet。
这行代码假设 buffer->buffer 是一个 char** 类型的指针,指向存储数据的字符指针数组。通过访问 buffer->buffer[buffer->write_idx],可以获得当前待写入的位置在环形缓冲区中指向的字符数组的指针。*/
int padding_size = PACKET_SIZE - HEADER_SIZE - size;/*运算得出填充大小,大于零表示本包数据不足*/
// 添加包头
memcpy(packet, "Custom Header", HEADER_SIZE);
// 添加数据
memcpy(packet + HEADER_SIZE, data, size);
// 补充0
memset(packet + HEADER_SIZE + size, 0, padding_size);
// 记录数据包长度
buffer->length[buffer->write_idx] = size;
buffer->write_idx = (buffer->write_idx + 1) % buffer->size;
buffer->count++;
pthread_cond_signal(&buffer->not_empty);//发送信号通知其他等待中的线程,表示缓冲区不再为空,这可能是用于唤醒一个等待从缓冲区中读取数据的消费者线程。
pthread_mutex_unlock(&buffer->lock);//解锁缓冲区的互斥锁,表示写入操作已完成,
}
void circularBufferRead(CircularBuffer *buffer, void *data, size_t size) {
pthread_mutex_lock(&buffer->lock);
while (buffer->count == 0) {
pthread_cond_wait(&buffer->not_empty, &buffer->lock);
}
char *packet = buffer->buffer[buffer->read_idx];
int packet_size = buffer->length[buffer->read_idx];
int padding_size = PACKET_SIZE - HEADER_SIZE - packet_size;
// 读取数据
memcpy(data, packet + HEADER_SIZE, packet_size);
// 填补的0不需要被读取,直接跳过
buffer->read_idx = (buffer->read_idx + 1) % buffer->size;
buffer->count--;
pthread_cond_signal(&buffer->not_full);
pthread_mutex_unlock(&buffer->lock);
}
void circularBufferDestroy(CircularBuffer *buffer) {
pthread_mutex_destroy(&buffer->lock);
pthread_cond_destroy(&buffer->not_full);
pthread_cond_destroy(&buffer->not_empty);
for (int i = 0; i < buffer->size; i++) {
free(buffer->buffer[i]);
}
free(buffer->buffer);
free(buffer->length);
buffer->size = 0;
buffer->read_idx = 0;
buffer->write_idx = 0;
buffer->count = 0;
}
void *readerThread(void *params) {
ReaderParams *readerParams = (ReaderParams*)params;
FILE *file = fopen(readerParams->filename, "rb");
if (file == NULL) {
perror("Failed to open file");
return NULL;
}
char data[PACKET_SIZE];
size_t bytesRead = 0;
while ((bytesRead = fread(data, 1, PACKET_SIZE - HEADER_SIZE, file)) > 0) {
circularBufferWrite(readerParams->buffer, data, bytesRead);
usleep(1000);
}
fclose(file);
return NULL;
}
int createUdpSocket(const char *dest_ip, int dest_port) {
int sock = socket(AF_INET, SOCK_DGRAM, 0);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr(dest_ip);
server_addr.sin_port = htons(dest_port);
if (connect(sock, (struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("Failed to connect to UDP server");
close(sock);
return -1;
}
return sock;
}
void *senderThread(void *params) {
SenderParams *senderParams = (SenderParams*)params;
int sock = createUdpSocket(senderParams->dest_ip, senderParams->dest_port);
char data[PACKET_SIZE];
while (1) {
circularBufferRead(senderParams->buffer, data, PACKET_SIZE - HEADER_SIZE);
send(sock, data, PACKET_SIZE - HEADER_SIZE, 0);
usleep(1000);
}
close(sock);
return NULL;
}
int main() {
CircularBuffer buffer;
circularBufferInit(&buffer, BUFFER_SIZE);
ReaderParams readerParams;
readerParams.buffer = &buffer;
readerParams.filename = "input.txt";
SenderParams senderParams;
senderParams.buffer = &buffer;
senderParams.dest_ip = "127.0.0.1"; // 目标地址
senderParams.dest_port = 12345; // 目标端口
pthread_t readerThreadId;
pthread_t senderThreadId;
pthread_create(&readerThreadId, NULL, readerThread, &readerParams);
pthread_create(&senderThreadId, NULL, senderThread, &senderParams);
pthread_join(readerThreadId, NULL);
pthread_join(senderThreadId, NULL);
circularBufferDestroy(&buffer);
return 0;
}