All of the interesting technological, artistic or just plain fun subjects I'd investigate if I had an infinite number of lifetimes. In other words, a dumping ground...

Thursday 10 April 2008

C++ priority queue with multiple threads

file mini.h

/* Copyright Tim O'Hare 2008 */
#ifndef _MINI_H
#define _MINI_H_
#include
#include
#define DEBUG false
class Mini
{
public:
Mini(long i): in(i) {};
Mini * pop(void) { Mini * rtn = qu.front(); qu.pop(); return rtn;};
void push(Mini * min) { qu.push(min); };
private:
long in;
std::queue qu;


};

class Message {
public:
/* for left < right; lefts cost saving is greater that right */
bool operator< (const Message& x) const { return costsave > x.costsave; }
void service(void) { if (DEBUG) std::cout << "cost saving: " << costsave << std::endl;};
Message(int costsaving): costsave(costsaving) {};
private:
int costsave;
};

class Lock {
public:
Lock(pthread_rwlock_t & stat_rwlock);
~Lock();
private:
pthread_rwlock_t rwlock;
};
#endif




file: main.cc
/* Copyright Tim O'Hare */
#include
#include
#include
#include
#include
#include "mini.h"

using namespace std;

vector < pthread_t * >threads;
pthread_rwlock_t stat_rwlock;

Lock::Lock(pthread_rwlock_t & stat_rwlock): rwlock(stat_rwlock)
{
pthread_rwlock_wrlock(&stat_rwlock);
//rwlock = stat_rwlock;
}
Lock::~Lock()
{
pthread_rwlock_unlock(&stat_rwlock);
}

/* C++ book Pg. 479 */
//void server(priority_queue&q, Lock& lck)
void server(priority_queue*q, pthread_rwlock_t& lck)
{
while(!q->empty()) {
if (DEBUG) cout << "q is not empty" << endl;
Message m(0);
{
//Lock lock(lck); // only hold the lock while extracting the message
Lock lock(stat_rwlock); // only hold the lock while extracting the message
if (DEBUG) cout << "delete: have the lock" << endl;
if (q->empty()) return;
m = q->top();
q->pop();
}
m.service();
}
if (DEBUG) cout << "q empty" << endl;
}

void sleeptime()
{
struct timeval now;
gettimeofday(&now, NULL);
unsigned int seed = now.tv_usec;
usleep(rand_r(&seed));
}

void createMessage(priority_queue*qu, int i)
{
Message m(i);
if (DEBUG) cout << "created message " << i << endl;
Lock lock(stat_rwlock); // only hold the lock while extracting the message
if (DEBUG) cout << "add: have the lock" << endl;
qu->push(m);
}

void* createMini(void *ptr)
{
if (DEBUG) cout << "Thread for createMini started" << endl;
int i=0;
//priority_queue*q1 = (priority_queue*)ptr;
//priority_queuequ = *q1;
priority_queue*qu = (priority_queue*)ptr;
for (;;i++) {
if (DEBUG) cout << "inserting " << i << endl;
createMessage(qu,i);
//sleep(1);
sleeptime();
}
}


void* deleteMini(void *ptr)
{
if (DEBUG) cout << "Thread for deleteMini started" << endl;
priority_queue*qu = (priority_queue*)ptr;
long i;
for (;;i++) {
if (DEBUG) cout << "Calling server" << endl;
server(qu, stat_rwlock);
if (DEBUG) cout << "Server finished" << endl;
//sleep(1);
//unsigned int now=time(NULL);
//usleep(rand_r(&now));
sleeptime();
}
}

int main()
{
//queue qu;
/*for (int i=0; i < 10; i++ )
qu.push(i);*/

//Mini * min2 = new Mini(0);
pthread_rwlock_init(&stat_rwlock, NULL);
priority_queuequ;
//server(qu,stat_rwlock);

pthread_t *tid1;
for (int j=1;j<100;j++) {
tid1 = new pthread_t;
threads.push_back(tid1);
if (pthread_create(tid1, NULL, &createMini, &qu)) {
perror("Failed to create createMini thread: ");
return -1;
}
}

pthread_t *tid2;
for (int k=1;k<100;k++) {
tid2 = new pthread_t;
threads.push_back(tid2);

//if (pthread_create(tid2, NULL, &deleteMini, min2)) {
if (pthread_create(tid2, NULL, &deleteMini, &qu)) {
perror("Failed to create deleteMini thread: ");
return -1;
}
}

pthread_join(*tid1, NULL);
pthread_join(*tid2, NULL);
return 0;
}



compile on Linux:
g++ -Wall main.cc -o qu -lpthread

No comments:

tim's shared items

Add to Google Reader or Homepage