Commit a702df11 authored by Edward Vigmond's avatar Edward Vigmond
Browse files

Small cleanup of threaded data reading code.

parent cf9e7199
......@@ -6,6 +6,7 @@
#include <zlib.h>
#include<string>
/** reader for case when all data can fit in computer RAM at once */
template<class T>
class DataAllInMem : public DataClass<T> {
......@@ -20,7 +21,7 @@ class DataAllInMem : public DataClass<T> {
~DataAllInMem( );
virtual T max(int); // maximum value at a time instance
virtual T max(); // maximum value
virtual T min(int); // minimum value at a one time
virtual T min(int); // minimum value at a time
virtual T min(); // minimum value
virtual T* slice(int); // return pointer to data slice
virtual void time_series( int, T* ); // time series for a point
......
......@@ -14,6 +14,7 @@ FileType FileTypeFinder ( const char *fn );
void CG_file_list( map<int,string>&filelist, const char *fn );
/** The basic class for reading data */
template<class T>
class DataClass {
public:
......@@ -32,11 +33,11 @@ class DataClass {
DataClass():data(NULL),maxtm(0),last_tm(-1),slice_size(0){}
protected:
T* data;
int maxtm;
int last_tm;
int slice_size;
string filename;
T* data; //!< data
int maxtm; //!< number of time slice
int last_tm; //!< last time that can be requested
int slice_size; //!< amountof data in one time slice
string filename; //!< file containing data
};
#endif
......@@ -3,14 +3,13 @@
#define BLOCK_SLSZ 20 //!< Number of intermediary slice pointers
/** virtual class for threaded data reading */
template<class T>
class DataReader {
public:
virtual void reader()=0;
virtual void local_maxmin()=0;
virtual void abs_maxmin()=0;
virtual void tmsr()=0;
virtual void find_maxtm()=0;
......@@ -56,13 +55,6 @@ class DataReader {
// If necessary, find the byte offset at the beginning of each slice of time.
// Set maxmin->del_sl_ptr to 1 or 0 depending on whether byte offset was found: 1 for found, 0 for not found
//
// abs_maxmin():
// Given:
// Same given as constructor
// Required:
// Find the maximum and minimum values of the entire data set. Put the results in maxmin->abs_max and maxmin->abs_min
// NOTE: maxmin->abs_max, maxmin->abs_min is type void. You must type cast to proper type before assigning any values
//
// tmsr():
// Given:
// Same given as constructor except that slave->unlock has the offset value and slave->unlock has the slave->data has the array location where the value is to be stored
......
#include "DataSlicer.h"
// thread to read data into a buffer
void* spawn_fill( void *d ) {
pthread_detach( pthread_self() );
Spawn_fill_t* sf= (Spawn_fill_t*)d;
sem_post( sf->sem );
sf->buf->fill( sf->in, sf->f_mutex, sf->time );
delete sf;
return NULL;
}
DataSlicer :: DataSlicer( gzFile gf, const int of, const int ss,
const int t, const int ic):offset(of),
in(gf),slicesz(ss),tmax(t),inc(ic){
pthread_mutex_init( &f_mutex, NULL );
sem_init( &sem, 0, 0 );
for( int i=0; i<NUM_BUF_SLICES; i++ )
buffer[i] = new ThreadBuffer( slicesz, offset, &sem );
}
DataSlicer :: ~DataSlicer() {
pthread_mutex_destroy( &f_mutex );
for( int i=0; i<NUM_BUF_SLICES; i++ ){
delete buffer[i];
buffer[i] = NULL;
}
}
/*
* get a slice of data and put it in dest
*
* use the semaphore to make sure that there are no pending updates
*
* return -1 if an error
*/
int
DataSlicer::get( int t, void *dest )
{
int b;
if( t>tmax || t<0 )
return -1;
int semval;
bool retrieved = false;
do {
do {
for( b=0; b<NUM_BUF_SLICES; b++ )
if( buffer[b]->retrieve( t, dest ) != -1 ) {
retrieved=true;
break;
}
sem_getvalue( &sem, &semval );
}while( semval && b==NUM_BUF_SLICES );
if( retrieved==false ){
updatebuffers( t );
}
}while( retrieved==false );
updatebuffers( t );
}
/*
* spawn a thread to fill one buffer
*/
void DataSlicer :: updatebuffer( int b, int t ) {
if( t>=0 && t<= tmax ) {
Spawn_fill_t* sf = new Spawn_fill_t;
sf->buf = buffer[b];
sf->time = t;
sf->in = in;
sf->f_mutex = &f_mutex;
sf->sem = &sem;
if( pthread_create( &(thread[b]), NULL, spawn_fill, (void *)sf ) )
fprintf( stderr, "Error: cannot spawn new thread\n");
}
}
/*
* fill all buffers
*
* centre around the time t
*/
void DataSlicer :: updatebuffers( int t ) {
// determine first and last instances
int lower = t-inc*(NUM_BUF_SLICES/2-!(NUM_BUF_SLICES%2));
int upper = lower + (NUM_BUF_SLICES-1)*inc;
if( lower>upper ) {
int tmp = lower;
lower = upper;
upper = tmp;
}
while( lower<0 )
lower += abs(inc);
while( upper>tmax )
upper -= abs(inc);
int j=0;
for( int s=lower; s<=upper; s+=abs(inc) ){
int i;
for( i=0; i<NUM_BUF_SLICES; i++ ) //look for slice in buffer
if( buffer[i]->slice_no() == s )
break;
if( i==NUM_BUF_SLICES ) { // if not there, add it
for( ; j<NUM_BUF_SLICES; j++ ) {//find out-of-range slice
int sn = buffer[j]->slice_no();
if( sn<lower || sn>upper || ((sn-lower)%inc) ) {
updatebuffer( j, s );
j++;
break;
}
}
}
}
}
/*
* return the time series for a given point
*
* point - offset into slice for the point
* buf - buffer in which to place data
* size - size of datum
*/
void
DataSlicer :: timeSeries(int point, unsigned char* buf, int size )
{
int pos=offset+point;
pthread_mutex_lock( &f_mutex );
for( int i=0; i<=tmax; i++ ) {
gzseek( in, pos, SEEK_SET );
gzread( in, buf, size );
pos += slicesz;
buf += size;
}
pthread_mutex_unlock( &f_mutex );
}
#include "ThreadBuffer.h"
#include "IGBheader.h"
const int NUM_BUF_SLICES=5;// number of read ahead buffers
// data structure for pthread call
typedef struct spfill {
ThreadBuffer* buf; // buffer
int time; // time to load into buffer
gzFile in;
pthread_mutex_t* f_mutex;
sem_t* sem;
} Spawn_fill_t;
class DataSlicer {
public:
DataSlicer( gzFile gf, const int of, const int ss,
const int tmax, const int i );
~DataSlicer();
int get( int t, void * );
void increment( int i ){ inc = i; }
int increment(){ return inc; }
void timeSeries( int, unsigned char *, int );
private:
void updatebuffer( int b, int t );
void updatebuffers( int t );
pthread_t thread[NUM_BUF_SLICES]; // spawn thread for new data
ThreadBuffer* buffer[NUM_BUF_SLICES];// buffers for data
pthread_mutex_t f_mutex; // control access to in
sem_t sem; // semaphore for #fill buffers
gzFile in; // input file
int offset; // offset to data in file
int slicesz; // slice size
int tmax; // maximum time
int inc; // increment between slices
};
......@@ -14,7 +14,6 @@ class FileSeqCGreader : public DataReader<T> {
~FileSeqCGreader();
virtual void reader();
virtual void local_maxmin();
virtual void abs_maxmin();
virtual void tmsr();
virtual void find_maxtm();
......@@ -133,21 +132,6 @@ void FileSeqCGreader<T>::tmsr()
}
template<class T>
void FileSeqCGreader<T>::abs_maxmin()
{
maxmin_ptr->abs_max = maxmin_ptr->lmax[0];
maxmin_ptr->abs_min = maxmin_ptr->lmin[0];
for( int t=1; t<=mthread->maxtm; t++ ) {
if( maxmin_ptr->lmin[t] < maxmin_ptr->abs_min )
maxmin_ptr->abs_min = maxmin_ptr->lmin[t];
if( maxmin_ptr->lmax[t] > maxmin_ptr->abs_max )
maxmin_ptr->abs_max = maxmin_ptr->lmax[t];
}
}
/** determine local minim and maxima for each slice and build a list of
* offsets into various blocks of the file
*
......
......@@ -11,7 +11,6 @@ class IGBreader : public DataReader<T> {
~IGBreader();
virtual void reader();
virtual void local_maxmin();
virtual void abs_maxmin();
virtual void tmsr();
virtual void find_maxtm();
......@@ -103,31 +102,6 @@ void IGBreader<T>::local_maxmin() {
}
/** find the min and max over all time */
template<class T>
void IGBreader<T>::abs_maxmin() {
// Set file pointer to location 1024
gzseek(in, 1024, SEEK_SET);
read_IGB_data(data, 1, head, buf);
maxmin_ptr->abs_max = data[0];
maxmin_ptr->abs_min = data[0];
// Read in time slices and check for max and min
for (int i=0; i<mthread->maxtm+1; i++){
for (int j=0; j<mthread->slsz; j++){
if ( data[j] > maxmin_ptr->abs_max ){
maxmin_ptr->abs_max = data[j];
}
else if ( data[j] < maxmin_ptr->abs_min ){
maxmin_ptr->abs_min = data[j];
}
}
read_IGB_data(data, 1, head, buf);
}
}
/** get the time series for a point */
template<class T>
void IGBreader<T>::tmsr(){
......
......@@ -52,7 +52,6 @@ OBJS = Fl_Gl_Tb_Window.o\
Surfaces.o\
TBmeshWin.o\
Tetrahedral.o\
ThreadBuffer.o\
Trackball.o\
Triangle.o\
trimesh.o\
......
......@@ -1226,7 +1226,10 @@ void TBmeshWin::timeplot()
}
//! Draw the cliiping plane
/** Draw the clipping plane
*
* \param cp the clipping plane
*/
void TBmeshWin::draw_clip_plane( int cp )
{
const GLfloat clipPlaneOpacity = 0.3;
......@@ -1266,7 +1269,7 @@ void TBmeshWin::draw_clip_plane( int cp )
}
/* write out the frame buffer after a change to it is has been made
/** write out the frame buffer after a change to it is has been made
*
* \param fname base name for output files
*/
......
#include "ThreadBuffer.h"
ThreadBuffer :: ThreadBuffer( int sz, int of, sem_t* sm ): size(sz),
offset(of), sem(sm), number(-1), unread(true) {
pthread_mutex_init( &mutex, NULL );
buffer = new char[size];
}
void
ThreadBuffer :: fill( gzFile gf, pthread_mutex_t* fmutex, int t ) {
int pos = offset + t*size;
pthread_mutex_lock( fmutex );
gzseek( gf, pos, SEEK_SET );
pthread_mutex_lock( &mutex );
gzread( gf, buffer, size );
pthread_mutex_unlock( fmutex );
number = t;
unread = true;
pthread_mutex_unlock( &mutex );
}
/*
* get a time slice
* only read in the slice if the time has changed since the last call
*
* -1 indicates the buffer did not contain data for the requested time
*/
int
ThreadBuffer :: retrieve( int t, void *dest ) {
int retval=0;
pthread_mutex_lock( &mutex );
if( number==t && unread ) {
memcpy( dest, buffer, size );
unread == false;
} else
retval = -1;
pthread_mutex_unlock( &mutex );
sem_trywait( sem );
return retval;
}
#include <pthread.h>
#include <semaphore.h>
#include <zlib.h>
#include <string.h>
class ThreadBuffer {
public:
ThreadBuffer( int sz, int, sem_t* );
void fill( gzFile gf, pthread_mutex_t* fmutex, int t );
int retrieve(int, void *);
int slice_no(){ return number; }
~ThreadBuffer(){ delete[] buffer; pthread_mutex_destroy(&mutex); }
private:
pthread_mutex_t mutex; // control buffer access
sem_t* sem; // semaphore
int size; // size of slice
int number; // slice number stored
char* buffer; // store for uncompressed data
int offset; // position of first datum in file stream
bool unread; // true if data no read
};
......@@ -15,6 +15,7 @@
#include <sys/stat.h>
#include <iostream>
#include <fstream>
#include <time.h>
#define READ_AHEAD 2 //!< Number of time slice to read ahead(>=1)
......@@ -22,6 +23,8 @@
#define THREADS (READ_AHEAD+READ_BEHIND+1) //!< Number of threads
const struct timespec sleepwait = {0, 10000};
//! class to hold maxima and minima
template<class T>
class Maxmin{
......@@ -61,13 +64,13 @@ Maxmin<T>::~Maxmin()
template<class T>
class Master{
public:
Master(const char *fn,int s):slsz(s),fname(fn),maxtm(0),maxmin_ptr(NULL){}
Master(const char *fn,int s):slsz(s),fname(fn),maxtm(0),maxmin(NULL){}
FileType ftype; //!< What type of file is it?
string fname; //!< Argument sent to the class
char* scanstr; //!< Scan string
int slsz; //!< \#points in a time slice (not bytes)
int maxtm; //!< Max time (=\#slices-1)
Maxmin<T>* maxmin_ptr; //!< Pointer to maxmin structure
Maxmin<T>* maxmin; //!< Pointer to maxmin structure
};
......@@ -81,18 +84,18 @@ template<class T>
class Slave {
public:
Slave(Master<T>*m=NULL, int datasize=0);
~Slave(){ delete DataReader_ptr; delete[] data; }
~Slave(){ delete datareader; delete[] data; }
Master<T>* mthread; // Pointer to master
int rdtm; // Read time
T* data; // thread data buffer
int size; // size of data buffer
bool v_bit; // Valid bit for data
int unlock; // Indicates to post a semaphore in case when the thread is waiting.
pthread_t threadID; // Thread ID
sem_t semaphores; // Semaphore
sem_t slice; // Semaphore
sem_t start; // Semaphore to start slave task
sem_t done; // Semaphore to signal slave done
int unlock; // Indicates slave to post a done semaphore when finished
pthread_mutex_t mutex_slave; // Mutex to slave
DataReader<T>* DataReader_ptr; // pointer to derived DataReader class
DataReader<T>* datareader; // pointer to derived DataReader class
void resize( int s ){if(size)delete data; data = new T[s];size=s;}
void master(Master<T>*m,int s){mthread=m;resize(s);}
};
......@@ -101,8 +104,8 @@ template<class T>
Slave<T>::Slave( Master<T>*_mthread, int datasize ): mthread(_mthread),
size(datasize), rdtm(-10000), v_bit(false), unlock(0), data(NULL)
{
sem_init( &semaphores, 0, 0 );
sem_init( &slice, 0, 0 );
sem_init( &start, 0, 0 );
sem_init( &done, 0, 0 );
pthread_mutex_init ( &mutex_slave, NULL );
if( size ) data = new T[size];
}
......@@ -124,10 +127,9 @@ class ThreadedData : public DataClass<T> {
public:
ThreadedData( const char *fn, int slsz);
~ThreadedData( );
static void* ThreadCaller( void* _sthread ); //!< Thread to read in a requested time slice
static void* absCollector( void* _sthread ); //!< Thread to read in the absolute max and min times
static void* localCollector( void* _sthread ); //!< Thread to read in the local max and min times
static void* tmsrCollector( void* _sthread ); //!< Thread to read in a requested time series
static void* ThreadCaller( void* _sthread ); //!< read in a time slice
static void* minimax( void* _sthread ); //!< read max and min times
static void* tmsrCollector( void* _sthread ); //!< read in a time series
virtual T max(int); //!< Maximum value at a slice of time
virtual T max(); //!< Maximum value of the entire series
virtual T min(int); //!< Minimum value at a slice of time
......@@ -136,14 +138,14 @@ class ThreadedData : public DataClass<T> {
virtual void time_series( int, T* ); //!< Pointer to time series
virtual void increment(int increment); //!< Sets increment
private:
Master<T>* mthread; //!< Pointer to master
Slave<T>* sthread; //!< Pointer to slave
Slave<T>* stmsr; //!< Pointer to slave
Maxmin<T>* maxmin_ptr; //!< Pointer to maxmin
int incrementation; //!< Incremenationl value
Master<T>* mthread; //!< master thread
Slave<T>* sthread; //!< slice reading threads
Slave<T>* stmsr; //!< thread to read time series
Maxmin<T>* maxmin; //!< Pointer to maxmin
int incrementation; //!< Increment value
pthread_mutex_t mutex_incrementation; //!< Mutex to incrementation
int element; //!< Value to decide the next thread
gzFile in;
gzFile in; //!< file to read
bool replaceable( Slave<T>*, int );
};
......@@ -161,7 +163,7 @@ ThreadedData<T>::ThreadedData( const char *fn, int slsz ):
slice_size=slsz;
mthread->ftype = FileTypeFinder( fn );
mthread->maxmin_ptr = maxmin_ptr = new Maxmin<T>;
mthread->maxmin = maxmin = new Maxmin<T>;
// ugly but I don't know what else to do besides specialization which is ugly
if ( typeid(T) == typeid(double) ) mthread->scanstr = "%lf";
......@@ -190,34 +192,34 @@ ThreadedData<T>::ThreadedData( const char *fn, int slsz ):
////////////////////////////////////////////////////////////////////////////
switch ( mthread->ftype ) {
case FTIGB:
sabs->DataReader_ptr = new IGBreader<T>(mthread, sabs, maxmin_ptr);
slocal->DataReader_ptr = new IGBreader<T>(mthread, slocal, maxmin_ptr);
stmsr->DataReader_ptr = new IGBreader<T>(mthread, stmsr, maxmin_ptr);
sabs->datareader = new IGBreader<T>(mthread, sabs, maxmin);
slocal->datareader = new IGBreader<T>(mthread, slocal, maxmin);
stmsr->datareader = new IGBreader<T>(mthread, stmsr, maxmin);
for ( int k=0; k<THREADS; k++ )
sthread[k].DataReader_ptr = new IGBreader<T>( mthread,
sthread+k, maxmin_ptr);
sthread[k].datareader = new IGBreader<T>( mthread,
sthread+k, maxmin);
break;
case FTascii:
sabs->DataReader_ptr = new asciireader<T>(mthread, sabs, maxmin_ptr);
slocal->DataReader_ptr = new asciireader<T>(mthread, slocal, maxmin_ptr);
stmsr->DataReader_ptr = new asciireader<T>(mthread, stmsr, maxmin_ptr);
sabs->datareader = new asciireader<T>(mthread, sabs, maxmin);
slocal->datareader = new asciireader<T>(mthread, slocal, maxmin);
stmsr->datareader = new asciireader<T>(mthread, stmsr, maxmin);
for ( int k=0; k<THREADS; k++ )
sthread[k].DataReader_ptr = new asciireader<T>(mthread,
sthread+k, maxmin_ptr);
sthread[k].datareader = new asciireader<T>(mthread,
sthread+k, maxmin);
break;
case FTfileSeqCG:
{
map<int,string> CGfiles;
CG_file_list( CGfiles, fn );
sabs->DataReader_ptr = new FileSeqCGreader<T>( mthread,
sabs, maxmin_ptr, CGfiles );
slocal->DataReader_ptr = new FileSeqCGreader<T>( mthread, slocal,
maxmin_ptr, CGfiles);
stmsr->DataReader_ptr = new FileSeqCGreader<T>( mthread, stmsr,
maxmin_ptr, CGfiles);
sabs->datareader = new FileSeqCGreader<T>( mthread,
sabs, maxmin, CGfiles );
slocal->datareader = new FileSeqCGreader<T>( mthread, slocal,
maxmin, CGfiles);
stmsr->datareader = new FileSeqCGreader<T>( mthread, stmsr,
maxmin, CGfiles);
for ( int k=0; k<THREADS; k++ )
sthread[k].DataReader_ptr = new FileSeqCGreader<T>( mthread,
sthread+k, maxmin_ptr, CGfiles);
sthread[k].datareader = new FileSeqCGreader<T>( mthread,
sthread+k, maxmin, CGfiles);
}
break;
default:
......@@ -227,16 +229,12 @@ ThreadedData<T>::ThreadedData( const char *fn, int slsz ):
filename = fn;
// Finds and sets maxtm
sabs->DataReader_ptr->find_maxtm();
sabs->datareader->find_maxtm();
maxtm = mthread->maxtm;
maxmin_ptr->size(maxtm+1);
// Call a thread to read in global max min times
//if ( pthread_create( &sabs->threadID, NULL, absCollector, (void*)sabs) )
//throw(1);
maxmin->size(maxtm+1);
// Call a thread to read in each time slice and get max&min times
if(pthread_create(&(slocal->threadID), NULL, localCollector, (void*)slocal) )
if(pthread_create(&(slocal->threadID), NULL, minimax, (void*)slocal) )
throw(1);
for ( int i=0; i<THREADS; i++ ){
......@@ -245,7 +243,7 @@ ThreadedData<T>::ThreadedData( const char *fn, int slsz ):
throw(1);
}
// Create a thread to read in slices
// Create a thread to read in time series
if( pthread_create( &stmsr->threadID, NULL, tmsrCollector, (void*)stmsr) )
throw(1);
}
......@@ -254,32 +252,32 @@ ThreadedData<T>::ThreadedData( const char *fn, int slsz ):
template<class T>