diff options
| author | deva <deva> | 2005-10-04 21:39:53 +0000 | 
|---|---|---|
| committer | deva <deva> | 2005-10-04 21:39:53 +0000 | 
| commit | 111b16802de661228ef414eb6ec7484e2fa186ed (patch) | |
| tree | d6d1ce9e8b857371414f33ff460266258c462275 /src | |
| parent | 9640339f2e9dc126406f6b6f8a091b924898b4f5 (diff) | |
*** empty log message ***
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile.am | 8 | ||||
| -rw-r--r-- | src/audio_encoder.cc | 125 | ||||
| -rw-r--r-- | src/audio_encoder.h | 19 | ||||
| -rw-r--r-- | src/frame.h | 19 | ||||
| -rw-r--r-- | src/mov_encoder.cc | 161 | ||||
| -rw-r--r-- | src/mov_encoder.h | 30 | ||||
| -rw-r--r-- | src/mov_encoder_thread.cc | 76 | ||||
| -rw-r--r-- | src/mov_encoder_thread.h | 25 | ||||
| -rw-r--r-- | src/mov_encoder_writer.cc | 22 | ||||
| -rw-r--r-- | src/mov_encoder_writer.h | 25 | ||||
| -rw-r--r-- | src/multicast.cc | 109 | ||||
| -rw-r--r-- | src/multicast.h | 47 | ||||
| -rw-r--r-- | src/multiplexer.cc | 52 | ||||
| -rw-r--r-- | src/multiplexer.h | 14 | ||||
| -rw-r--r-- | src/threadsafe_queue.cc | 44 | ||||
| -rw-r--r-- | src/threadsafe_queue.h | 56 | ||||
| -rw-r--r-- | src/threadsafe_queue_fifo.cc | 83 | ||||
| -rw-r--r-- | src/threadsafe_queue_fifo.h | 82 | ||||
| -rw-r--r-- | src/threadsafe_queue_priority.cc | 93 | ||||
| -rw-r--r-- | src/threadsafe_queue_priority.h | 64 | 
20 files changed, 921 insertions, 233 deletions
| diff --git a/src/Makefile.am b/src/Makefile.am index 3adaf25..b0ed05f 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -34,6 +34,7 @@ miav_SOURCES = $(shell  if [ $QT_CXXFLAGS ] ; then ../tools/MocList cc; fi ) \  	mov_encoder.cc \  	mov_encoder_thread.cc \  	mov_encoder_writer.cc \ +	multicast.cc \  	multiplexer.cc \  	network.cc \  	player.cc \ @@ -41,6 +42,9 @@ miav_SOURCES = $(shell  if [ $QT_CXXFLAGS ] ; then ../tools/MocList cc; fi ) \  	server_status.cc \  	socket.cc \  	thread.cc \ +	threadsafe_queue.cc \ +	threadsafe_queue_fifo.cc \ +	threadsafe_queue_priority.cc \  	util.cc \  	videowidget.cc \  	yuv_draw.cc @@ -81,6 +85,7 @@ EXTRA_DIST = \  	mov_encoder.h \  	mov_encoder_thread.h \  	mov_encoder_writer.h \ +	multicast.h \  	multiplexer.h \  	network.h \  	package.h \ @@ -90,6 +95,9 @@ EXTRA_DIST = \  	server_status.h \  	socket.h \  	thread.h \ +	threadsafe_queue.h \ +	threadsafe_queue_fifo.h \ +	threadsafe_queue_priority.h \  	util.h \  	videowidget.h \  	yuv_draw.h diff --git a/src/audio_encoder.cc b/src/audio_encoder.cc index 8972e9a..97c6084 100644 --- a/src/audio_encoder.cc +++ b/src/audio_encoder.cc @@ -28,8 +28,10 @@  #include "audio_encoder.h"  #include "util.h" -AudioEncoder::AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, sem_t *in_sem, -                           FramePriorityQueue *out, pthread_mutex_t *out_mutex, sem_t *out_sem, +#include "liblame_wrapper.h" + +AudioEncoder::AudioEncoder(ThreadSafeQueuePriority *audio_input_queue, +                           ThreadSafeQueuePriority *audio_output_queue,                             Info *i)  {    info = i; @@ -37,18 +39,8 @@ AudioEncoder::AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, se    running = true; -  // Queues -  inputqueue = in; -  outputqueue = out; - -  // Queue mutexes -  input_mutex = in_mutex; -  output_mutex = out_mutex; - -  input_sem = in_sem; -  output_sem = out_sem; - -  frame_number = 0; +  input_queue = audio_input_queue; +  output_queue = audio_output_queue;  }  AudioEncoder::~AudioEncoder() @@ -59,18 +51,109 @@ void AudioEncoder::thread_main()  {    info->info("AudioEncoder::run"); -  unsigned int queuesize = 0; -    // Run with slightly lower priority than MovEncoderWriter    //  nice(2);    Frame *in_frame = NULL;    Frame *out_frame = NULL; + +  LibLAMEWrapper lame(info); + +  while(running) { +    in_frame = input_queue->pop(); + +    if(in_frame == NULL) info->error("AudioEncoder: in_frame == NULL!"); + +    // Check for end of stream +    if(in_frame->endOfFrameStream == true) { +      info->info("endOfFrameStream in AudioEncoder"); +      running = false; +      out_frame = lame.close(); +    } else { +      // Encode audio +      out_frame = lame.encode(in_frame); +    } +    out_frame->number = in_frame->number; +    out_frame->endOfFrameStream = in_frame->endOfFrameStream; + +    //delete in_frame; +    in_frame = NULL; + +    output_queue->push(out_frame); +  } + +  info->info("AudioEncoder::stop"); +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/* + +void AudioEncoder::thread_main() +{ +  info->info("AudioEncoder::run"); + +#ifndef NEW_QUEUE +  unsigned int queuesize = 0;    Frame *tmpframe; +#endif + +  // Run with slightly lower priority than MovEncoderWriter +  nice(2); + +  Frame *in_frame = NULL; +  Frame *out_frame = NULL;    LibLAMEWrapper lame(info);    while(running) { +    info->info("fisk"); +#ifdef NEW_QUEUE +    in_frame = input_queue->pop(); +#else      sem_wait(input_sem);      // If no frame is in the buffer, get one from the queue @@ -94,7 +177,8 @@ void AudioEncoder::thread_main()        sleep_0_2_frame();      } -     +#endif +      // Check for end of stream      if(in_frame->endOfFrameStream == true) {        info->info("endOfFrameStream in AudioEncoder"); @@ -110,6 +194,9 @@ void AudioEncoder::thread_main()      delete in_frame;      in_frame = NULL; +#ifdef NEW_QUEUE +    output_queue->push(out_frame); +#else      // Lock output mutex      pthread_mutex_lock(output_mutex);      outputqueue->push(out_frame); @@ -118,10 +205,14 @@ void AudioEncoder::thread_main()      // Kick multiplexer (audio)      sem_post(output_sem); +#endif    } +#ifndef NEW_QUEUE    // Kick multiplexer (audio)    sem_post(output_sem); +#endif    info->info("AudioEncoder::stop");  } +*/ diff --git a/src/audio_encoder.h b/src/audio_encoder.h index b15ce45..9d86178 100644 --- a/src/audio_encoder.h +++ b/src/audio_encoder.h @@ -36,12 +36,12 @@  #include "info.h" -#include "liblame_wrapper.h" +#include "threadsafe_queue_priority.h"  class AudioEncoder : public Thread {  public: -  AudioEncoder(FramePriorityQueue *in, pthread_mutex_t *in_mutex, sem_t *in_sem, -               FramePriorityQueue *out, pthread_mutex_t *out_mutex, sem_t *out_sem, +  AudioEncoder(ThreadSafeQueuePriority *audio_input_queue, +               ThreadSafeQueuePriority *audio_output_queue,                 Info *info);    ~AudioEncoder(); @@ -50,19 +50,10 @@ public:    volatile bool running;  private: -  unsigned int frame_number; -    Info *info; -  // Input/Output queues -  FramePriorityQueue *inputqueue; -  FramePriorityQueue *outputqueue; -  pthread_mutex_t *input_mutex; -  pthread_mutex_t *output_mutex; - -  //thread stuff -  sem_t *input_sem; -  sem_t *output_sem; +  ThreadSafeQueuePriority *input_queue; +  ThreadSafeQueuePriority *output_queue;  }; diff --git a/src/frame.h b/src/frame.h index 2b8d9f5..6859116 100644 --- a/src/frame.h +++ b/src/frame.h @@ -31,9 +31,6 @@  // Definition of vector  #include <vector> -// Definition of priority_queue -#include <queue> -  class Frame {  public:    Frame(unsigned char *d, int sz); @@ -54,22 +51,6 @@ public:    bool endOfFrameStream;  }; -#include <functional> - -// Method for use, when comparing Frames in priority queue. -template <typename T> -struct frame_priority : std::binary_function<T, T, bool> { -  bool operator() (const T& a, const T& b) const { -    return ((Frame*)a)->number > ((Frame*)b)->number; -  } -}; - -// Additional helper types.  typedef std::vector< Frame* > FrameVector; -typedef std::queue< FrameVector* > FrameVectorQueue; -typedef std::priority_queue< Frame*,  -                             std::vector<Frame*>,  -                             frame_priority<Frame*> > FramePriorityQueue; -  #endif/*__FRAME_H__*/ diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc index cf45ae0..a455f42 100644 --- a/src/mov_encoder.cc +++ b/src/mov_encoder.cc @@ -42,11 +42,12 @@  #include "miav_config.h"  #include "debug.h" +#include "libfame_wrapper.h"  MovEncoder::MovEncoder(volatile bool *r, sem_t *r_sem, -                       FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex,  -                       FramePriorityQueue *v_out, pthread_mutex_t *v_out_mutex, sem_t *v_out_sem, -                       FramePriorityQueue *a_out, pthread_mutex_t *a_out_mutex, sem_t *a_out_sem, +                       ThreadSafeQueueFIFO<FrameVector*> *in, +                       ThreadSafeQueuePriority *video_out, +                       ThreadSafeQueuePriority *audio_out,                         Info *i)  {    info = i; @@ -56,18 +57,9 @@ MovEncoder::MovEncoder(volatile bool *r, sem_t *r_sem,    // Queues    inputqueue = in; -  video_outputqueue = v_out; -  audio_outputqueue = a_out; +  video_output_queue = video_out; +  audio_output_queue = audio_out; -  // Queue mutexes -  input_mutex = in_mutex; -  video_output_mutex = v_out_mutex; -  audio_output_mutex = a_out_mutex; - -  input_sem = in_sem; -  video_output_sem = v_out_sem; -  audio_output_sem = a_out_sem; -      read_sem = r_sem;  } @@ -82,12 +74,131 @@ void MovEncoder::thread_main()  {    info->info("MovEncoder::run");    //  static volatile int test = 0; +  //  int insize = 0; + +  // Run with slightly lower priority than MovEncoderWriter AND AudioEncoder +  //  nice(3); + +  FrameVector *item; +  Frame *in_frame; +  Frame *out_v_frame; +  Frame *out_a_frame; + +  LibFAMEWrapper fame(info); + +  // Process until running == false and the queue is empty +  while(*running) { + +    item = inputqueue->pop(); + +    if(item) { +      for(unsigned int cnt = 0; cnt < item->size(); cnt++) { +        in_frame = item->at(cnt); + +        // Check for end of stream +        if(in_frame->endOfFrameStream == true) { +          info->info("endOfFrameStream in MovEncoder"); + +          // Signal to stop running +          *running = false; + +          // Kick them sleepy ones so they get the message. +          int threads = config->readInt("encoding_threads"); +          for(int cnt = 0; cnt < threads; cnt++) {/*sem_post(input_sem);*/} // FIXME: Kick the other encoders +        } + +        // Encode video +        out_v_frame = fame.encode(in_frame); +        out_v_frame->number = in_frame->number; +        out_v_frame->endOfFrameStream = in_frame->endOfFrameStream; +     +        // Create audio frame +        out_a_frame = in_frame; +  +        video_output_queue->push(out_v_frame); +        audio_output_queue->push(out_a_frame); +      } + +      delete item; + +      item = NULL; + +      // Kick reader +      sem_post(read_sem); +    } +  } + +  info->info("MovEncoder::stop"); +} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +/* + +// this runs in a thread +void MovEncoder::thread_main() +{ +  info->info("MovEncoder::run"); +  //  static volatile int test = 0; +#ifndef NEW_QUEUE    int v_outsize = 0;    int a_outsize = 0; +#endif    int insize = 0;    // Run with slightly lower priority than MovEncoderWriter AND AudioEncoder -  //nice(3); +  nice(3);    FrameVector *item;    Frame *in_frame; @@ -131,16 +242,20 @@ void MovEncoder::thread_main()          // Create audio frame          out_a_frame = in_frame; +#ifdef NEW_QUEUE         +        video_output_queue->push(out_v_frame); +        audio_output_queue->push(out_a_frame); +#else          // Lock output mutex          pthread_mutex_lock(video_output_mutex);          video_outputqueue->push(out_v_frame);          v_outsize = video_outputqueue->size();          pthread_mutex_unlock(video_output_mutex);          // Unlock output mutex -         +          // Kick multiplexer (video)          sem_post(video_output_sem); -         +          // Lock output mutex          pthread_mutex_lock(audio_output_mutex);          audio_outputqueue->push(out_a_frame); @@ -150,6 +265,7 @@ void MovEncoder::thread_main()          // Kick audio encoder          sem_post(audio_output_sem); +#endif        }        delete item; @@ -159,16 +275,19 @@ void MovEncoder::thread_main()        sem_post(read_sem);      }    } -  /* -  info->info("Input pool size: %d, video output pool size: %d, audio output pool size: %d",  -             insize, v_outsize, a_outsize); -  */ +   +  //info->info("Input pool size: %d, video output pool size: %d, audio output pool size: %d",  +  //           insize, v_outsize, a_outsize); +   +#ifndef NEW_QUEUE    // Kick audio encoder    sem_post(audio_output_sem);    // Kick multiplexer (video)    sem_post(video_output_sem); +#endif    info->info("MovEncoder::stop");  } +*/ diff --git a/src/mov_encoder.h b/src/mov_encoder.h index 8488008..ace016c 100644 --- a/src/mov_encoder.h +++ b/src/mov_encoder.h @@ -48,15 +48,15 @@ using namespace std;  #include "info.h" -#include "libfame_wrapper.h" -//#include "liblame_wrapper.h" +#include "threadsafe_queue_priority.h" +#include "threadsafe_queue_fifo.h"  class MovEncoder : public Thread {  public:    MovEncoder(volatile bool *r, sem_t *r_sem, -             FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex, -             FramePriorityQueue *v_out, pthread_mutex_t *v_out_mutex, sem_t *v_out_sem, -             FramePriorityQueue *a_out, pthread_mutex_t *a_out_mutex, sem_t *a_out_sem, +             ThreadSafeQueueFIFO< FrameVector*>  *in, +             ThreadSafeQueuePriority *video_out, +             ThreadSafeQueuePriority *audio_out,               Info *info);    ~MovEncoder(); @@ -65,24 +65,16 @@ public:    volatile bool *running;  private: -  //  LibFAMEWrapper *fame; -  //  LibLAMEWrapper *lame; -    Info *info; -  // Input/Output queues -  FrameVectorQueue *inputqueue; -  FramePriorityQueue *video_outputqueue; -  FramePriorityQueue *audio_outputqueue; -  pthread_mutex_t *input_mutex; -  pthread_mutex_t *video_output_mutex; -  pthread_mutex_t *audio_output_mutex; +  // Input queue +  ThreadSafeQueueFIFO< FrameVector* > *inputqueue; -  //thread stuff -  sem_t *input_sem; -  sem_t *video_output_sem; -  sem_t *audio_output_sem; +  // Output queues +  ThreadSafeQueuePriority *video_output_queue; +  ThreadSafeQueuePriority *audio_output_queue; +  // Reader (mov_encoder_thread.cc) semaphore    sem_t *read_sem;  }; diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc index dab308d..7c7f5d9 100644 --- a/src/mov_encoder_thread.cc +++ b/src/mov_encoder_thread.cc @@ -34,17 +34,19 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i)    info = i;    info->info("MovEncoderThread"); -  // Queues -  inputqueue = new FrameVectorQueue(); -  video_outputqueue = new FramePriorityQueue(); -  audio_inputqueue = new FramePriorityQueue(); -  audio_outputqueue = new FramePriorityQueue(); - -  // Queue mutexes -  pthread_mutex_init (&input_mutex, NULL); -  pthread_mutex_init (&video_output_mutex, NULL); -  pthread_mutex_init (&audio_input_mutex, NULL); -  pthread_mutex_init (&audio_output_mutex, NULL); +  // Queue +  inputqueue = new ThreadSafeQueueFIFO<FrameVector*>(); + +  // Initialize read semaphore +	sem_init(&read_sem, 0, 0); + +  video_output_queue = new ThreadSafeQueuePriority(info); +  audio_input_queue = new ThreadSafeQueuePriority(info); +  audio_output_queue = new ThreadSafeQueuePriority(info); + +  info->info("video_output_queue: 0x%x", video_output_queue); +  info->info("audio_input_queue: 0x%x", audio_input_queue); +  info->info("audio_output_queue: 0x%x", audio_output_queue);    block = new FrameVector(); @@ -53,13 +55,6 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i)    info->info("Frame sequence length %d", num_frames_in_block);    threads = config->readInt("encoding_threads"); - -  // Thread stuff -	sem_init(&in_sem, 0, 0); -	sem_init(&video_out_sem, 0, 0); -	sem_init(&audio_in_sem, 0, 0); -	sem_init(&audio_out_sem, 0, 0); -	sem_init(&read_sem, 0, 0);    movencodersrunning = true; @@ -67,26 +62,25 @@ MovEncoderThread::MovEncoderThread(const char *cpr, Info *i)    // Create the video encoders    for(int cnt = 0; cnt < threads; cnt++) { -    MovEncoder *movenc =  -      new MovEncoder(&movencodersrunning, &read_sem, -                     inputqueue, &in_sem, &input_mutex, -                     video_outputqueue, &video_output_mutex, &video_out_sem, -                     audio_inputqueue, &audio_input_mutex, &audio_in_sem, -                     info); +    MovEncoder *movenc = new MovEncoder(&movencodersrunning, &read_sem, +                                        inputqueue, +                                        video_output_queue, +                                        audio_input_queue, +                                        info);      movenc->run();      encs.push_back(movenc);    }    // Create the audio encoder -  audioenc = new AudioEncoder(audio_inputqueue, &audio_input_mutex, &audio_in_sem, -                              audio_outputqueue, &audio_output_mutex, &audio_out_sem, +  audioenc = new AudioEncoder(audio_input_queue, +                              audio_output_queue,                                info);    audioenc->run();    // Create the multiplexer    writer = new MovEncoderWriter(cpr, -                                video_outputqueue, &video_output_mutex, &video_out_sem, -                                audio_outputqueue, &audio_output_mutex, &audio_out_sem, +                                video_output_queue, +                                audio_output_queue,                                  info);    writer->run(); @@ -99,7 +93,7 @@ MovEncoderThread::~MovEncoderThread()    info->info("~MovEncoderThread");    // First we destroy the movie encoders -  for(int cnt = 0; cnt < threads; cnt++) sem_post(&in_sem); // Kick them +  //  for(int cnt = 0; cnt < threads; cnt++) sem_post(&in_sem); // Kick them    for(int cnt = 0; cnt < threads; cnt++) {      encs[cnt]->wait_stop();    // Wait for it to stop      delete encs[cnt];    // Delete it @@ -108,7 +102,6 @@ MovEncoderThread::~MovEncoderThread()    info->info("Deleted the movie encoders");    // Then we destroy the audio encoder -  sem_post(&audio_in_sem); // Kick it    audioenc->wait_stop();  // Wait for it to stop.    delete audioenc;  // delete the audio encoder @@ -116,18 +109,15 @@ MovEncoderThread::~MovEncoderThread()    // Finally we destroy the writer.    writer->running = false; -  sem_post(&video_out_sem);  // Kick it to make it stop. -  sem_post(&audio_out_sem);  // Kick it to make it stop. + +  // FIXME: Post writer    writer->wait_stop(); // Wait for it to stop.    delete writer;  // delete the writer (end thereby close the file)    info->info("Deleted the writer");    // Destroy the semaphores. -  sem_destroy(&in_sem); -  sem_destroy(&video_out_sem); -  sem_destroy(&audio_in_sem); -  sem_destroy(&audio_out_sem); +  // sem_destroy(&in_sem);    sem_destroy(&read_sem);    info->info("~MovEncoderThread::done"); @@ -139,12 +129,11 @@ void MovEncoderThread::encode(Frame* frame)    if(output % 250 == 0) // 25 * 24      info->info("inputqueue: %d\tvideo_outputqueue: %d\taudio_inputqueue: %d\taudio_outputqueue: %d.",                 inputqueue->size(), -               video_outputqueue->size(), -               audio_inputqueue->size(), -               audio_outputqueue->size()); +               video_output_queue->size(), +               audio_input_queue->size(), +               audio_output_queue->size());    output++; -    if(frame == NULL) {      info->info("MovEncoderThread::encode - NULL frame detected.");      // Terminate @@ -159,14 +148,7 @@ void MovEncoderThread::encode(Frame* frame)      // Wait until a free encoder.      sem_wait(&read_sem); -    // Lock input mutex -    pthread_mutex_lock(&input_mutex);      inputqueue->push(block); -    pthread_mutex_unlock(&input_mutex); -    // Unlock input mutex - -    // Kick encoders -    sem_post(&in_sem);      // Start new block      block = new FrameVector; diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h index e3fba27..8cc24f8 100644 --- a/src/mov_encoder_thread.h +++ b/src/mov_encoder_thread.h @@ -36,6 +36,11 @@  #include <vector>  using namespace std; +#include "frame.h" + +#include "threadsafe_queue_priority.h" +#include "threadsafe_queue_fifo.h" +  #include "mov_encoder.h"  #include "audio_encoder.h"  #include "mov_encoder_writer.h" @@ -52,24 +57,16 @@ public:  private:    Info *info; -  FrameVectorQueue *inputqueue; -  FramePriorityQueue *video_outputqueue; -  FramePriorityQueue *audio_inputqueue; -  FramePriorityQueue *audio_outputqueue; +  //  FrameVectorQueue *inputqueue; +  ThreadSafeQueueFIFO< FrameVector* > *inputqueue;    FrameVector *block;    //thread stuff -  sem_t in_sem; -  sem_t video_out_sem; -  sem_t audio_in_sem; -  sem_t audio_out_sem; -    sem_t read_sem; -  pthread_mutex_t input_mutex; -  pthread_mutex_t video_output_mutex; -  pthread_mutex_t audio_input_mutex; -  pthread_mutex_t audio_output_mutex; +  ThreadSafeQueuePriority *video_output_queue; +  ThreadSafeQueuePriority *audio_input_queue; +  ThreadSafeQueuePriority *audio_output_queue;    volatile bool movencodersrunning; @@ -79,8 +76,6 @@ private:    unsigned int num_frames_in_block;    MovEncoderWriter *writer; -  //  pthread_t* writer_tid; -    AudioEncoder* audioenc;    int threads; diff --git a/src/mov_encoder_writer.cc b/src/mov_encoder_writer.cc index 717998a..732f9ba 100644 --- a/src/mov_encoder_writer.cc +++ b/src/mov_encoder_writer.cc @@ -47,8 +47,8 @@ using namespace std;  #include "multiplexer.h"  MovEncoderWriter::MovEncoderWriter(const char* cpr,  -                                   FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s,  -                                   FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s,  +                                   ThreadSafeQueuePriority *video_q, +                                   ThreadSafeQueuePriority *audio_q,                                     Info *i)  {    info = i; @@ -74,23 +74,15 @@ MovEncoderWriter::MovEncoderWriter(const char* cpr,    ltime = localtime(&t);    sprintf(date, "%.4d%.2d%.2d",             ltime->tm_year + 1900,  -          ltime->tm_mon,  +          ltime->tm_mon + 1,  // Ranging from 0 to 11            ltime->tm_mday);    sprintf(fname, "%s/%s/%s/%s-%s-", server_root->c_str(), birthmonth, cpr, cpr, date);    file = new File(fname, "mpg", info); -  video_queue = v_q; -  video_sem = v_s; -  video_mutex = v_m; - -  audio_queue = a_q; -  audio_sem = a_s; -  audio_mutex = a_m; - -  video_frame_number = 0; -  audio_frame_number = 0; +  video_queue = video_q; +  audio_queue = audio_q;    running = true;  } @@ -107,8 +99,8 @@ void MovEncoderWriter::thread_main()    info->info("MovEncoderWriter::run");    Multiplexer multiplexer(file, info, &running,  -                          video_queue, video_mutex, video_sem, -                          audio_queue, audio_mutex, audio_sem); +                          video_queue, +                          audio_queue);    multiplexer.multiplex();    info->info("MovEncoderWriter::stop"); diff --git a/src/mov_encoder_writer.h b/src/mov_encoder_writer.h index e653223..3146bf8 100644 --- a/src/mov_encoder_writer.h +++ b/src/mov_encoder_writer.h @@ -33,6 +33,8 @@  #include "file.h"  #include "info.h" +#include "threadsafe_queue_priority.h" +  #include <string>  using namespace std; @@ -42,8 +44,8 @@ using namespace std;  class MovEncoderWriter : public Thread {  public:    MovEncoderWriter(const char* cpr, -                   FramePriorityQueue *video_queue, pthread_mutex_t *video_mutex, sem_t *video_sem,  -                   FramePriorityQueue *audio_queue, pthread_mutex_t *audio_mutex, sem_t *audio_sem,  +                   ThreadSafeQueuePriority *video_queue, +                   ThreadSafeQueuePriority *audio_queue,                     Info *info);    ~MovEncoderWriter(); @@ -56,23 +58,8 @@ private:    File *file; -  FramePriorityQueue *video_queue; -  FramePriorityQueue *audio_queue; -  pthread_mutex_t *video_mutex; -  pthread_mutex_t *audio_mutex; -  sem_t *video_sem; -  sem_t *audio_sem; - -  unsigned int video_frame_number; -  unsigned int audio_frame_number; - -  //  Timecode_struc SCR; -  //  double timestamp; - -//  void write_audio_header(unsigned short int packetsize); -//  void write_video_header(unsigned short int packetsize); -//  void write_system_header(unsigned int audio_size, unsigned int video_size); -//  void write_packet_header(unsigned int audio_size, unsigned int video_size); +  ThreadSafeQueuePriority *video_queue; +  ThreadSafeQueuePriority *audio_queue;  }; diff --git a/src/multicast.cc b/src/multicast.cc new file mode 100644 index 0000000..0cf1b87 --- /dev/null +++ b/src/multicast.cc @@ -0,0 +1,109 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + *            multicast.cc + * + *  Mon Sep 26 12:25:22 CEST 2005 + *  Copyright  2005 Bent Bisballe Nyeng + *  deva@aasimon.org + ****************************************************************************/ + +/* + *  This file is part of MIaV. + * + *  MIaV is free software; you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation; either version 2 of the License, or + *  (at your option) any later version. + * + *  MIaV is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with MIaV; if not, write to the Free Software + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. + */ +#include "config.h" +#include "multicast.h" + +#include <netinet/in.h> +#include <netdb.h> +#include <sys/socket.h> +#include <sys/param.h> +#include <arpa/inet.h> + +Multicast::Multicast(Info *i) +{ +  char addr[] = "192.168.0.10"; +  int port = 666; + +  info = i; +  if(!UDPOpen(addr, port)) info->error("Error creating socket %s:%d", addr, port); +} + +Multicast::~Multicast() +{ +} + +void Multicast::Write(char* buf, int size) +{ +  if(write(sock, buf, size) != size) info->error("Error Writing to socket."); +} + + +/* + * open UDP socket + */ +bool Multicast::UDPOpen(char * address, int port) +{ +  int enable = 1L; +  struct sockaddr_in stAddr; +  struct sockaddr_in stLclAddr; +  struct hostent * host; +  //  int sock; +   +  stAddr.sin_family = AF_INET; +  stAddr.sin_port = htons(port); +  if((host = gethostbyname(address)) == NULL) return false; +  stAddr.sin_addr = *((struct in_addr *) host->h_addr_list[0]); + +  /* Create a UDP socket */ +  if((sock = socket(AF_INET, SOCK_DGRAM, 0)) < 0) +        return false; + +  /* Allow multiple instance of the client to share the same address and port */  if(setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char *) &enable, sizeof(unsigned long int)) < 0) return false; + +#ifdef USE_MULTICAST +  /* If the address is multicast, register to the multicast group */ +  if(is_address_multicast(stAddr.sin_addr.s_addr)) +  { +    struct ip_mreq stMreq; +   +    /* Bind the socket to port */ +    stLclAddr.sin_family      = AF_INET; +    stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY); +    stLclAddr.sin_port        = stAddr.sin_port; +    if(bind(sock, (struct sockaddr*) & stLclAddr, sizeof(stLclAddr)) < 0) return false; + +    /* Register to a multicast address */ +    stMreq.imr_multiaddr.s_addr = stAddr.sin_addr.s_addr; +    stMreq.imr_interface.s_addr = INADDR_ANY; +    if(setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (char *) & stMreq, sizeof(stMreq)) < 0)  +      return false; +  } +  else +#endif +  { +    /* Bind the socket to port */ +    stLclAddr.sin_family      = AF_INET; +    stLclAddr.sin_addr.s_addr = htonl(INADDR_ANY); +    stLclAddr.sin_port        = htons(0); +    if(bind(sock, (struct sockaddr*) & stLclAddr, sizeof(stLclAddr)) < 0) +        return false; +  } + +  connect(sock, (struct sockaddr*) & stAddr, sizeof(stAddr)); + +  return true; +} diff --git a/src/multicast.h b/src/multicast.h new file mode 100644 index 0000000..f0a4979 --- /dev/null +++ b/src/multicast.h @@ -0,0 +1,47 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + *            multicast.h + * + *  Mon Sep 26 12:25:22 CEST 2005 + *  Copyright  2005 Bent Bisballe Nyeng + *  deva@aasimon.org + ****************************************************************************/ + +/* + *  This file is part of MIaV. + * + *  MIaV is free software; you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation; either version 2 of the License, or + *  (at your option) any later version. + * + *  MIaV is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with MIaV; if not, write to the Free Software + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. + */ +#include "config.h" +#ifndef __MIAV_MULTICAST_H__ +#define __MIAV_MULTICAST_H__ + +#include "info.h" + +class Multicast { +public: +  Multicast(Info *info); +  ~Multicast(); + +  void Write(char* buf, int size); + +private: +  Info *info; + +  bool UDPOpen(char * address, int port); +  int sock; +}; + +#endif/*__MIAV_MULTICAST_H__*/ diff --git a/src/multiplexer.cc b/src/multiplexer.cc index d2aecfc..0b54bf8 100644 --- a/src/multiplexer.cc +++ b/src/multiplexer.cc @@ -68,37 +68,27 @@ static double picture_rate_index[16] = {    RESERVED, RESERVED, RESERVED, RESERVED, RESERVED, RESERVED, RESERVED  };  */ -  Multiplexer::Multiplexer(File *f, Info *i, volatile bool *r, -                         FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s,  -                         FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s) +                         ThreadSafeQueuePriority *video_q, +                         ThreadSafeQueuePriority *audio_q)  {    running = r;    file = f;    info = i; -  queue[TYPE_VIDEO] = v_q; -  queue[TYPE_AUDIO] = a_q; - -  sem[TYPE_VIDEO] = v_s; -  sem[TYPE_AUDIO] = a_s; - -  mutex[TYPE_VIDEO] = v_m; -  mutex[TYPE_AUDIO] = a_m; -    frame[TYPE_VIDEO] = NULL; -  frame[TYPE_AUDIO] = NULL; +  written[TYPE_VIDEO] = 0.0; -  frame_number[TYPE_VIDEO] = 0; -  frame_number[TYPE_AUDIO] = 0; +  frame[TYPE_AUDIO] = NULL; +  written[TYPE_AUDIO] = 0.0; -  write_system_header = 0;    write_audio_packet = 0; +  write_system_header = 0;    audio_header_read = false; -  written[TYPE_VIDEO] = 0.0; -  written[TYPE_AUDIO] = 0.0; +  queue[TYPE_VIDEO] = video_q; +  queue[TYPE_AUDIO] = audio_q;    SCR = 3904;//0x40010003LL;//0x1E80; @@ -111,29 +101,11 @@ Multiplexer::~Multiplexer()  Frame *Multiplexer::getFrame(StreamType type)  { -  Frame *tmpframe; -  Frame *frame = NULL; - -  sem_wait(sem[type]); - -  while( frame == NULL ) { -    // Lock output mutex -    pthread_mutex_lock( mutex[type] ); -    tmpframe = queue[type]->top(); -     -    if(tmpframe && tmpframe->number == frame_number[type] ) { -      queue[type]->pop(); -      frame = tmpframe; -      frame_number[type]++; -      read[type] = 0; -    } -     -    pthread_mutex_unlock( mutex[type] ); -    // Unlock output mutex +  info->info("Get %s Frame", type==TYPE_AUDIO?"Audio\0":"Video\0"); +   +  read[type] = 0; -    sleep_0_2_frame(); -  } -  return frame; +  return queue[type]->pop();  }  int Multiplexer::read_stream(char *buf, unsigned int size, StreamType type) diff --git a/src/multiplexer.h b/src/multiplexer.h index 8d67766..2604ddc 100644 --- a/src/multiplexer.h +++ b/src/multiplexer.h @@ -36,6 +36,8 @@  #include "info.h"  #include "frame.h" +#include "threadsafe_queue_priority.h" +  /**   * Multiplexer configuration   */ @@ -67,8 +69,8 @@ typedef enum {  class Multiplexer {  public:    Multiplexer(File *file, Info *info, volatile bool *running, -              FramePriorityQueue *v_q, pthread_mutex_t *v_m, sem_t *v_s,  -              FramePriorityQueue *a_q, pthread_mutex_t *a_m, sem_t *a_s); +              ThreadSafeQueuePriority *video_queue, +              ThreadSafeQueuePriority *audio_queue);    ~Multiplexer();    void multiplex(); @@ -102,10 +104,6 @@ private:    Frame *getFrame(StreamType type);    int read_stream(char *buf, unsigned int size, StreamType type); -  FramePriorityQueue *queue[NUM_TYPES]; -  pthread_mutex_t *mutex[NUM_TYPES]; -  sem_t *sem[NUM_TYPES]; -    Frame *frame[NUM_TYPES];    unsigned int frame_number[NUM_TYPES];    unsigned int read[NUM_TYPES]; @@ -115,7 +113,9 @@ private:    volatile bool *running;    // Audio Header - bool audio_header_read; +  bool audio_header_read; + +  ThreadSafeQueuePriority *queue[NUM_TYPES];  };  #endif/*__MIAV_MULTIPLEXER_H__*/ diff --git a/src/threadsafe_queue.cc b/src/threadsafe_queue.cc new file mode 100644 index 0000000..89f2d6a --- /dev/null +++ b/src/threadsafe_queue.cc @@ -0,0 +1,44 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + *            threadsafe_queue.cc + * + *  Tue Sep 27 14:43:45 CEST 2005 + *  Copyright  2005 Bent Bisballe Nyeng + *  deva@aasimon.org + ****************************************************************************/ + +/* + *  This file is part of MIaV. + * + *  MIaV is free software; you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation; either version 2 of the License, or + *  (at your option) any later version. + * + *  MIaV is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with MIaV; if not, write to the Free Software + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. + */ +#include "config.h" +#include "threadsafe_queue.h" +/* +template <typename T> +ThreadSafeQueue<T>::ThreadSafeQueue() +{ +  pthread_mutex_init (&mutex, NULL); +  sem_init(&semaphore, 0, 0); +} + +template <typename T> +ThreadSafeQueue<T>::~ThreadSafeQueue() +{ +  pthread_mutex_destroy(&mutex); +  sem_destroy(&semaphore); +} + +*/ diff --git a/src/threadsafe_queue.h b/src/threadsafe_queue.h new file mode 100644 index 0000000..616e81e --- /dev/null +++ b/src/threadsafe_queue.h @@ -0,0 +1,56 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + *            threadsafe_queue.h + * + *  Tue Sep 27 14:01:01 CEST 2005 + *  Copyright  2005 Bent Bisballe Nyeng + *  deva@aasimon.org + ****************************************************************************/ + +/* + *  This file is part of MIaV. + * + *  MIaV is free software; you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation; either version 2 of the License, or + *  (at your option) any later version. + * + *  MIaV is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with MIaV; if not, write to the Free Software + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. + */ +#include "config.h" +#ifndef __MIAV_THREADSAFE_QUEUE_H__ +#define __MIAV_THREADSAFE_QUEUE_H__ + +#include <pthread.h> +#include <semaphore.h> + +template <typename T> +class ThreadSafeQueue { +public: +  ThreadSafeQueue() { +    pthread_mutex_init (&mutex, NULL); +    sem_init(&semaphore, 0, 0); +  } + +  virtual ~ThreadSafeQueue() { +    pthread_mutex_destroy(&mutex); +    sem_destroy(&semaphore); +  } + +  virtual void push(T t) = 0; +  virtual T pop() = 0; +  virtual int size() = 0; + +  //protected: +  pthread_mutex_t mutex; +  sem_t semaphore; +}; + +#endif/*__MIAV_THREADSAFE_QUEUE_H__*/ diff --git a/src/threadsafe_queue_fifo.cc b/src/threadsafe_queue_fifo.cc new file mode 100644 index 0000000..633cb58 --- /dev/null +++ b/src/threadsafe_queue_fifo.cc @@ -0,0 +1,83 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + *            threadsafe_queue_fifo.cc + * + *  Tue Sep 27 14:01:10 CEST 2005 + *  Copyright  2005 Bent Bisballe Nyeng + *  deva@aasimon.org + ****************************************************************************/ + +/* + *  This file is part of MIaV. + * + *  MIaV is free software; you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation; either version 2 of the License, or + *  (at your option) any later version. + * + *  MIaV is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with MIaV; if not, write to the Free Software + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. + */ +#include "config.h" +#include "threadsafe_queue_fifo.h" +/* +template <typename T> +ThreadSafeQueueFIFO<T>::ThreadSafeQueueFIFO() +  //  : ThreadSafeQueue<T>() +{ +} + +template <typename T> +ThreadSafeQueueFIFO<T>::~ThreadSafeQueueFIFO() +{ +} + +template <typename T> +void ThreadSafeQueueFIFO<T>::push(T t) +{ +  // Lock mutex +  pthread_mutex_lock( &mutex ); +  queue.push(t); +  pthread_mutex_unlock( &mutex ); +  // Unlock mutex + +  sem_post(&semaphore); +} + +template <typename T> +T ThreadSafeQueueFIFO<T>::pop() +{ +  sem_wait(&semaphore); + +  T t; + +  // Lock mutex +  pthread_mutex_lock( &mutex ); +  t = queue.front(); +  queue.pop(); +  pthread_mutex_unlock( &mutex ); +  // Unlock mutex + +  return t; +} + +template <typename T> +int ThreadSafeQueueFIFO<T>::size() +{ +  int sz; + +  // Lock mutex +  pthread_mutex_lock( &mutex ); +  sz = queue.size(); +  pthread_mutex_unlock( &mutex ); +  // Unlock mutex + +  return sz; +} +*/ diff --git a/src/threadsafe_queue_fifo.h b/src/threadsafe_queue_fifo.h new file mode 100644 index 0000000..84b2fbb --- /dev/null +++ b/src/threadsafe_queue_fifo.h @@ -0,0 +1,82 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + *            threadsafe_queue_fifo.h + * + *  Tue Sep 27 14:01:10 CEST 2005 + *  Copyright  2005 Bent Bisballe Nyeng + *  deva@aasimon.org + ****************************************************************************/ + +/* + *  This file is part of MIaV. + * + *  MIaV is free software; you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation; either version 2 of the License, or + *  (at your option) any later version. + * + *  MIaV is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with MIaV; if not, write to the Free Software + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. + */ +#include "config.h" +#ifndef __MIAV_THREADSAFE_QUEUE_FIFO_H__ +#define __MIAV_THREADSAFE_QUEUE_FIFO_H__ + +#include "threadsafe_queue.h" +#include <queue> + +template <typename T> +class ThreadSafeQueueFIFO: public ThreadSafeQueue<T> { +public: +  ThreadSafeQueueFIFO() {} +  ~ThreadSafeQueueFIFO() {} + +  void push(T t) { +    // Lock mutex +    pthread_mutex_lock( &mutex ); +    queue.push(t); +    pthread_mutex_unlock( &mutex ); +    // Unlock mutex + +    sem_post(&semaphore); +  } + +  T pop() { +    sem_wait(&semaphore); +    T t; + +    // Lock mutex +    pthread_mutex_lock( &mutex ); +    t = queue.front(); +    queue.pop(); +    pthread_mutex_unlock( &mutex ); +    // Unlock mutex + +    return t; +  } + +  int size() { +    int sz; + +    // Lock mutex +    pthread_mutex_lock( &mutex ); +    sz = queue.size(); +    pthread_mutex_unlock( &mutex ); +    // Unlock mutex +     +    return sz; +  } + +private: +  std::queue<T> queue; +}; + + + +#endif/*__MIAV_THREADSAFE_QUEUE_FIFO_H__*/ diff --git a/src/threadsafe_queue_priority.cc b/src/threadsafe_queue_priority.cc new file mode 100644 index 0000000..130b0f5 --- /dev/null +++ b/src/threadsafe_queue_priority.cc @@ -0,0 +1,93 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + *            threadsafe_queue_priority.cc + * + *  Tue Sep 27 14:01:24 CEST 2005 + *  Copyright  2005 Bent Bisballe Nyeng + *  deva@aasimon.org + ****************************************************************************/ + +/* + *  This file is part of MIaV. + * + *  MIaV is free software; you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation; either version 2 of the License, or + *  (at your option) any later version. + * + *  MIaV is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with MIaV; if not, write to the Free Software + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. + */ +#include "config.h" +#include "threadsafe_queue_priority.h" + +#include "util.h" + +ThreadSafeQueuePriority::ThreadSafeQueuePriority(Info* i, unsigned int number)  +  //  : ThreadSafeQueue< Frame* >() +{ +  info = i; +  framenumber = number; +} + +ThreadSafeQueuePriority::~ThreadSafeQueuePriority() +{ +} + +void ThreadSafeQueuePriority::push(Frame *frame) +{ +  // Lock mutex +  pthread_mutex_lock( &mutex ); +  queue.push(frame); +  pthread_mutex_unlock( &mutex ); +  // Unlock mutex + +  sem_post(&semaphore); +} + +Frame *ThreadSafeQueuePriority::pop() +{ +  sem_wait(&semaphore); + +  Frame *tmpframe = NULL; +  Frame *frame = NULL; + +  while( frame == NULL ) { +    // Lock mutex +    pthread_mutex_lock( &mutex ); + +    tmpframe = queue.top(); +     +    if(tmpframe && tmpframe->number == framenumber ) { +      queue.pop(); +      frame = tmpframe; +      framenumber++; +    } +     +    pthread_mutex_unlock( &mutex ); +    // Unlock mutex + +    if(frame == NULL) sleep_0_2_frame(); +  } + +  return frame; +} + +int ThreadSafeQueuePriority::size() +{ +  int sz; + +  // Lock mutex +  pthread_mutex_lock( &mutex ); +  sz = queue.size(); +  pthread_mutex_unlock( &mutex ); +  // Unlock mutex + +  return sz; +} diff --git a/src/threadsafe_queue_priority.h b/src/threadsafe_queue_priority.h new file mode 100644 index 0000000..8d3cdf1 --- /dev/null +++ b/src/threadsafe_queue_priority.h @@ -0,0 +1,64 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + *            threadsafe_queue_priority.h + * + *  Tue Sep 27 14:01:24 CEST 2005 + *  Copyright  2005 Bent Bisballe Nyeng + *  deva@aasimon.org + ****************************************************************************/ + +/* + *  This file is part of MIaV. + * + *  MIaV is free software; you can redistribute it and/or modify + *  it under the terms of the GNU General Public License as published by + *  the Free Software Foundation; either version 2 of the License, or + *  (at your option) any later version. + * + *  MIaV is distributed in the hope that it will be useful, + *  but WITHOUT ANY WARRANTY; without even the implied warranty of + *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the + *  GNU General Public License for more details. + * + *  You should have received a copy of the GNU General Public License + *  along with MIaV; if not, write to the Free Software + *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA. + */ +#include "config.h" +#ifndef __MIAV_THREADSAFE_QUEUE_PRIORITY_H__ +#define __MIAV_THREADSAFE_QUEUE_PRIORITY_H__ + +#include "threadsafe_queue.h" + +#include "frame.h" + +#include <queue> +#include <functional> + +#include "info.h" + +// Method for use, when comparing Frames in priority queue. +template <typename T> +struct priority : std::binary_function<T, T, bool> { +  bool operator() (const T& a, const T& b) const { +    return ((Frame*)a)->number > ((Frame*)b)->number; +  } +}; + +class ThreadSafeQueuePriority: public ThreadSafeQueue< Frame* > { +public: +  ThreadSafeQueuePriority(Info *info, unsigned int framenumber = 0); +  ~ThreadSafeQueuePriority(); + +  void push(Frame *frame); +  Frame *pop(); +  int size(); + +private: +  Info* info; + +  unsigned int framenumber; +  std::priority_queue< Frame*, std::vector<Frame*>, priority<Frame*> > queue; +}; + +#endif/*__MIAV_THREADSAFE_QUEUE_PRIORITY_H__*/ | 
