Читаем Системное программирование в среде Windows полностью

#include "messages.h"

#include

#define DELAY_COUNT 1000

#define MAX_THREADS 1024

/* Размеры и коэффициенты блокирования очередей. Эти величины являются */

/* произвольными и могут регулироваться для обеспечения оптимальной */

/* производительности. Текущие значения не являются сбалансированными. */

#define TBLOCK_SIZE 5 /*Передающий поток формирует группы из 5 сообщений.*/

#define TBLOCK_TIMEOUT 50 /*Интервал ожидания сообщений передающим потоком.*/

#define P2T_QLEN 10 /* Размер очереди "производитель/передающий поток". */

#define T2R_QLEN 4 /*Размер очереди "передающий поток/принимающий поток".*/

#define R2C_QLEN 4 /* Размер очереди "принимающий поток/потребитель" -- */

/* для каждого потребителя существует только одна очередь.*/

DWORD WINAPI producer(PVOID);

DWORD WINAPI consumer(PVOID);

DWORD WINAPI transmitter(PVOID);

DWORD WINAPI receiver(PVOID);

typedef struct _THARG {

 volatile DWORD thread_number;

 volatile DWORD work_goal; /* Используется потоками производителей. */

 volatile DWORD work_done; /* Используется потоками производителей и потребителей. */ '

 char future[8]; 

} THARG;

/* Сгруппированные сообщения, посылаемые передающим потоком потребителю.*/

typedef struct t2r_msg_tag {

 volatile DWORD num_msgs; /* Количество содержащихся сообщений. */

 msg_block_t messages[TBLOCK_SIZE];

} t2r_msg_t;

queue_t p2tq, t2rq, *r2cq_array;

static volatile DWORD ShutDown = 0;

static DWORD EventTimeout = 50;

DWORD _tmain(DWORD argc, LPTSTR * argv[]) {

 DWORD tstatus, nthread, ithread, goal, thid;

 HANDLE *producer_th, *consumer_th, transmitter_th, receiver_th;

 THARG *producer_arg, *consumer_arg;

 nthread = atoi(argv[1]);

 goal = atoi(argv[2]);

 producer_th = malloc(nthread * sizeof(HANDLE));

 producer_arg = calloc(nthread, sizeof(THARG));

 consumer_th = malloc(nthread * sizeof(HANDLE));

 consumer_arg = calloc(nthread, sizeof(THARG));

 q_initialize(&p2tq, sizeof(msg_block_t), P2T_QLEN); 

 q_initialize(&t2rq, sizeof(t2r_msg_t), T2R_QLEN);

 /* Распределить ресурсы, инициализировать очереди "принимающий поток/потребитель" для каждого потребителя. */

 r2cq_array = calloc(nthread, sizeof(queue_t));

 for (ithread = 0; ithread < nthread; ithread++) {

  /* Инициализировать очередь r2с для потока данного потребителя. */

  q_initialize(&r2cq_array[ithread], sizeof(msg_block_t), R2C_QLEN);

  /* Заполнить аргументы потока. */

  consumer_arg[ithread].thread_number = ithread;

  consumer_arg[ithread].work_goal = goal;

  consumer_arg[ithread].work_done = 0;

  consumer_th[ithread] = (HANDLE)_beginthreadex(NULL, 0, consumer, (PVOID)&consumer_arg[ithread], 0, &thid);

Перейти на страницу:
Нет соединения с сервером, попробуйте зайти чуть позже