From b04122f3f7884de08eb5d59bb3fd2bff829f9039 Mon Sep 17 00:00:00 2001 From: deva Date: Sun, 22 May 2005 15:49:22 +0000 Subject: Added multithreaded encoding support. --- configure.in | 6 ++ etc/miav.conf | 2 +- src/Makefile.am | 47 ++++++------- src/frame.cc | 4 ++ src/frame.h | 34 +++++++++ src/miav.cc | 80 +++++++++++++++++++++ src/mov_encoder.cc | 108 ++++++++++++++-------------- src/mov_encoder.h | 40 ++++++----- src/mov_encoder_thread.cc | 176 ++++++++++++++++++++++++---------------------- src/mov_encoder_thread.h | 23 +++++- src/mov_encoder_writer.cc | 117 ++++++++++++++++++++++++++++++ src/mov_encoder_writer.h | 67 ++++++++++++++++++ src/queue.h | 45 +----------- src/server.cc | 6 +- src/server_status.cc | 5 ++ 15 files changed, 531 insertions(+), 229 deletions(-) create mode 100644 src/mov_encoder_writer.cc create mode 100644 src/mov_encoder_writer.h diff --git a/configure.in b/configure.in index a0f8ce3..e6ff7ac 100644 --- a/configure.in +++ b/configure.in @@ -40,6 +40,12 @@ else AC_MSG_WARN([*** Building without GUI support!]) fi +dnl ====================== +dnl Check for pthread library +dnl ====================== +AC_CHECK_HEADER(pthread.h, , AC_MSG_ERROR([*** pthread headers not found!])) +AC_CHECK_LIB(pthread, sem_init, , AC_MSG_ERROR([*** libpthread not found!])) + dnl ====================== dnl Check for dv library dnl ====================== diff --git a/etc/miav.conf b/etc/miav.conf index 476bbef..6eb33e1 100644 --- a/etc/miav.conf +++ b/etc/miav.conf @@ -34,4 +34,4 @@ frame_quality = 80 # The number of threads started for paralel encoding on the server # (for multiprocessor systems) -encoding_threads = 4 +encoding_threads = 2 diff --git a/src/Makefile.am b/src/Makefile.am index 2bdb4dc..499e49b 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -1,6 +1,3 @@ -#frekin' wierd -## TODO: Move ffmpeg, libxml and libsdl into configure.in - AM_CXXFLAGS := $(CXXFLAGS) $(EXTRA_CXXFLAGS) -I../include $(QT_CXXFLAGS) \ -DQT_THREAD_SUPPORT \ -DPIXMAPS=\"$(datadir)/pixmaps\" \ @@ -11,59 +8,61 @@ bin_PROGRAMS = miav miav_SOURCES = $(shell if [ $QT_CXXFLAGS ] ; then ../tools/MocList cc; fi ) \ aboutwindow.cc \ camera.cc \ + cprquerydialog.cc \ decoder.cc \ + dv1394.cc \ encoder.cc \ frame.cc \ + img_encoder.cc \ + info_console.cc \ + info_gui.cc \ mainwindow.cc \ - cprquerydialog.cc \ + messagebox.cc \ miav.cc \ + miav_config.cc \ + mov_encoder.cc \ + mov_encoder_thread.cc \ + mov_encoder_writer.cc \ network.cc \ player.cc \ + server.cc \ + server_status.cc \ socket.cc \ thread.cc \ util.cc \ - videowidget.cc \ - messagebox.cc \ - miav_config.cc \ - mov_encoder.cc \ - img_encoder.cc \ - server.cc \ - dv1394.cc \ - server_status.cc \ - info_gui.cc \ - info_console.cc \ - mov_encoder_thread.cc + videowidget.cc EXTRA_DIST = \ aboutwindow.h \ camera.h \ cprquerydialog.h \ + debug.h \ decoder.h \ + dv.h \ + dv1394.h \ encoder.h \ frame.h \ img_encoder.h \ + info.h \ + info_console.h \ + info_gui.h \ mainwindow.h \ messagebox.h \ miav.h \ miav_config.h \ mov_encoder.h \ + mov_encoder_thread.h \ + mov_encoder_writer.h \ network.h \ package.h \ player.h \ queue.h \ server.h \ + server_status.h \ socket.h \ thread.h \ util.h \ - videowidget.h \ - debug.h \ - dv.h \ - dv1394.h \ - server_status.h \ - info.h \ - info_gui.h \ - info_console.h \ - mov_encoder_thread.h + videowidget.h miav_LDADD := $(shell if [ $QT_CXXFLAGS ] ; then ../tools/MocList o; fi ) diff --git a/src/frame.cc b/src/frame.cc index 002ab43..ee5451d 100644 --- a/src/frame.cc +++ b/src/frame.cc @@ -31,6 +31,9 @@ /* * $Log$ + * Revision 1.7 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.6 2005/05/03 08:31:59 deva * Removed the error object, and replaced it with a more generic info object. * @@ -51,6 +54,7 @@ Frame::Frame(unsigned char *d, int sz) data = (unsigned char *)malloc(sz); if(d) memcpy(data, d, sz); size = sz; + number = 0; } Frame::~Frame() diff --git a/src/frame.h b/src/frame.h index aebe3cb..72cfe8e 100644 --- a/src/frame.h +++ b/src/frame.h @@ -31,6 +31,9 @@ /* * $Log$ + * Revision 1.6 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.5 2005/05/03 08:31:59 deva * Removed the error object, and replaced it with a more generic info object. * @@ -45,17 +48,48 @@ #ifndef __FRAME_H__ #define __FRAME_H__ +// Definition of vector +#include + +// Definition of priority_queue +#include + class Frame { public: Frame(unsigned char *d, int sz); ~Frame(); + /* + // Smaller frame number is higher priority + bool operator<(const Frame& f) const { + return number > f.number; + } + */ + unsigned char *data; int size; + unsigned int number; + bool shoot; bool freeze; bool record; }; +#include + +template +struct frame_priority : std::binary_function { + bool operator() (const T& a, const T& b) const { + return ((Frame*)a)->number > ((Frame*)b)->number; + } +}; + +// Additional helper types. +typedef std::vector< Frame* > FrameVector; +typedef std::queue< FrameVector* > FrameVectorQueue; +typedef std::priority_queue< Frame*, + std::vector, + frame_priority > FramePriorityQueue; + #endif/*__FRAME_H__*/ diff --git a/src/miav.cc b/src/miav.cc index ba30a17..c3f08fe 100644 --- a/src/miav.cc +++ b/src/miav.cc @@ -31,6 +31,9 @@ /* * $Log$ + * Revision 1.10 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.9 2005/05/03 17:13:25 deva * Fixed some missong Info object references. * @@ -92,6 +95,83 @@ int grab(int argc, char *argv[]) { } +#if 0 +/** + * Peters DAEMON code + */ +#include +#include +#include + +int rundaemon() +{ + int pipes[2]; + + int f; + + pipe(pipes); + + f = fork(); + switch(f) { + case -1: // error + fprintf(stderr, "Error, could not fork()!\n"); + exit(0); + break; + case 0: // child + return communicationCtl(pipes[0]); + break; + default: // parent + signal(SIGCHLD, reportAndExit); + + return serialportCtl(pipes[1]); + break; + } + + return 0; +} + +void daemon() { + int f; + int fd; + + chdir("/"); + umask(0); + + f = fork(); + switch(f) { + case -1: + fprintf(stderr, "fork() error!\n"); + return 0; + break; + case 0: + if( (fp = fopen("/tmp/termo.out", "w")) == NULL) { + fprintf(stderr, "Outfile open error!\n"); + exit(0); + } + fd = open("/dev/null", O_NOCTTY | O_RDWR, 0666); + dup2(0, fd); + dup2(1, fd); + dup2(2, fd); + setsid(); + signal (SIGTERM, SIG_IGN); + signal (SIGINT, SIG_IGN); + signal (SIGHUP, SIG_IGN); + serialfd = initSerialPort(INDEVICE); + if(setgid(NOBODY_GROUP) != 0) {fprintf(fp, "GRP ch ERR\n");return 1;} + if(setuid(NOBODY_USER) != 0) {fprintf(fp, "USER ch ERR\n");return 1;} + + return rundaemon(); + break; + default: + exit(0); + } + return 0; +} +/** + * End og Peters DAEMON code + */ +#endif + /** * This function starts the MIaV server. */ diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc index 7bffc50..0a478fc 100644 --- a/src/mov_encoder.cc +++ b/src/mov_encoder.cc @@ -39,28 +39,26 @@ /* * $Log$ - * Revision 1.23 2005/05/19 14:10:22 deva + * Revision 1.24 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. * + * Revision 1.23 2005/05/19 14:10:22 deva * Multithreading rulez? * * Revision 1.22 2005/05/17 19:16:26 deva - * * Made new mpeg writer work, with proper file permissions. * * Revision 1.21 2005/05/17 14:30:56 deva * Added code, preparing threaded encoding. * * Revision 1.20 2005/05/16 16:00:57 deva - * * Lots of stuff! * * Revision 1.19 2005/05/16 13:25:52 deva - * * Moved video setting to configuration file. * Fine tuned setting for 2.4ghz server * * Revision 1.18 2005/05/16 11:13:24 deva - * * Optimized some encoding parameters. * * Revision 1.17 2005/05/16 10:45:10 deva @@ -102,10 +100,10 @@ #include "miav_config.h" #include "debug.h" -//av_alloc_format_context -//av_destruct_packet_nofree -MovEncoder::MovEncoder() +MovEncoder::MovEncoder(sem_t *r_sem, + FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex, + FramePriorityQueue *out, sem_t *out_sem, pthread_mutex_t *out_mutex) { // FIXME: Hmmm... should this be detected somewhere?! int w = 720; @@ -212,12 +210,18 @@ MovEncoder::MovEncoder() fame_init(fame_context, &fame_par, fame_buffer, FAME_BUFFER_SIZE); + running = true; - // Thread stuff - sem_init(&sem, 0, 0); - sem_init(&done, 0, 0); + inputqueue = in; + outputqueue = out; - running = true; + input_sem = in_sem; + output_sem = out_sem; + + read_sem = r_sem; + + input_mutex = in_mutex; + output_mutex = out_mutex; } MovEncoder::~MovEncoder() @@ -265,7 +269,7 @@ Frame *MovEncoder::encode_video(Frame *dvframe) dvdecoder->num_dif_seqs = 12; } - pixels[ 0 ] = rgb; // We use this as the output buffer + pixels[ 0 ] = picture; // We use this as the output buffer pitches[ 0 ] = w * 2; dv_decode_full_frame(dvdecoder, @@ -279,7 +283,7 @@ Frame *MovEncoder::encode_video(Frame *dvframe) uint8_t *y = yuv.y; uint8_t *cb = yuv.u; uint8_t *cr = yuv.v; - uint8_t *p = rgb; + uint8_t *p = picture; for ( int i = 0; i < h; i += 2 ) { // process two scanlines (one from each field, interleaved) @@ -326,59 +330,51 @@ void MovEncoder::encode_audio(Frame *dvframe) // TODO: Do some audio stuff here sometime! } -void MovEncoder::encodeSequence(Queue *queue) -{ - // set input queue - inputqueue = queue; - - // unlock semaphore - sem_post(&sem); -} - - // this runs in a thread void MovEncoder::run() { - fprintf(stderr, "Encoder Ready\n"); fflush(stderr); + FrameVector *item; + Frame *in_frame; + Frame *out_frame; + + fprintf(stderr, "\t\t\t\tEncoder Ready\n"); fflush(stderr); while(running) { - // wait for semaphore - // lock semaphore - sem_wait(&sem); - if(inputqueue == NULL) continue; + sem_wait(input_sem); - fprintf(stderr, "."); fflush(stderr); + fprintf(stderr, "\t\t\t\tReading block\n"); fflush(stderr); - // allocate new output queue - outputqueue = new Queue(); - // while input queue.pop - Frame *fi, *fo; - while(inputqueue->length() > 0) { - fi = inputqueue->pop(); + // Lock inout mutex + pthread_mutex_lock(input_mutex); + item = inputqueue->front(); + inputqueue->pop(); + pthread_mutex_unlock(input_mutex); + // Unlock input mutex - // encode frame from input queue - fo = encode(fi); + if(!item) { + fprintf(stderr, "\t\t\t\tEmpty block detected.\n"); fflush(stderr); + continue; + } - // and push result on output queue - outputqueue->push(fo); + for(int cnt = 0; cnt < item->size(); cnt++) { + in_frame = item->at(cnt); + out_frame = encode(in_frame); + out_frame->number = in_frame->number; - // delete the input frame - //delete fi; - } - // delete input queue - delete inputqueue; - sem_post(&done); - } -} + delete in_frame; -Queue *MovEncoder::getResultSequence() -{ - // wait for sempahore - sem_wait(&done); + // Lock output mutex + pthread_mutex_lock(output_mutex); + outputqueue->push(out_frame); + pthread_mutex_unlock(output_mutex); + // Unlock output mutex + + fprintf(stderr, "\t\t\t\tEncoded [%d] - pushed it for writing\n", in_frame->number); fflush(stderr); + } - // fprintf(stderr, "POP!\n"); fflush(stderr); + delete item; - // return output queue - return outputqueue; + sem_post(read_sem); + sem_post(output_sem); + } } - diff --git a/src/mov_encoder.h b/src/mov_encoder.h index 13eb63b..d7e0c17 100644 --- a/src/mov_encoder.h +++ b/src/mov_encoder.h @@ -36,19 +36,19 @@ /* * $Log$ - * Revision 1.9 2005/05/19 14:10:22 deva + * Revision 1.10 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. * + * Revision 1.9 2005/05/19 14:10:22 deva * Multithreading rulez? * * Revision 1.8 2005/05/17 14:30:56 deva * Added code, preparing threaded encoding. * * Revision 1.7 2005/05/09 16:40:20 deva - * * Added optimize yuv conversion code * * Revision 1.6 2005/05/05 20:41:38 deva - * * Removed the last pieces of ffmpeg... replaced it with libfame... * Not quite working yet, but all the major code is in place! * @@ -66,6 +66,7 @@ #include #include + // Use libfame #include @@ -73,52 +74,57 @@ #include #include +#include +using namespace std; + #include "frame.h" -#include "queue.h" #include "util.h" #include "thread.h" +#include // size specifies the length of the buffer. #define FAME_BUFFER_SIZE (2*720*576*4) // FIXME: One size fits all... class MovEncoder : public Thread { public: - MovEncoder(); + MovEncoder(sem_t *r_sem, + FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex, + FramePriorityQueue *out, sem_t *out_sem, pthread_mutex_t *out_mutex); ~MovEncoder(); Frame* encode(Frame *frame); void run(); - Queue *getResultSequence(); - void encodeSequence(Queue *queue); - volatile bool running; private: - // Input queue - Queue *inputqueue; - Queue *outputqueue; + // Input/Output queues + FrameVectorQueue *inputqueue; + FramePriorityQueue *outputqueue; //thread stuff - sem_t sem; - sem_t done; + sem_t *input_sem; + sem_t *output_sem; + + sem_t *read_sem; + pthread_mutex_t *input_mutex; + pthread_mutex_t *output_mutex; + Frame *encode_video(Frame *frame); void encode_audio(Frame *frame); - // buffer is the buffer where encoded data will be written to. It must be large - // enough to contain a few frames. + // libFAME encoder unsigned char *fame_buffer; fame_parameters_t fame_par; fame_context_t *fame_context; fame_yuv_t yuv; - FILE *f; // libdv decoder dv_decoder_t *dvdecoder; - unsigned char rgb[FAME_BUFFER_SIZE]; + unsigned char picture[FAME_BUFFER_SIZE]; }; #endif diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc index ed71a31..d95961d 100644 --- a/src/mov_encoder_thread.cc +++ b/src/mov_encoder_thread.cc @@ -31,6 +31,9 @@ /* * $Log$ + * Revision 1.7 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.6 2005/05/19 14:10:22 deva * * Multithreading rulez? @@ -58,120 +61,123 @@ MovEncoderThread::MovEncoderThread(const char *filename) { - file = open(filename, - O_CREAT | O_WRONLY, //| O_LARGEFILE - S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); - if(file == -1) { - fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno)); - return; - } + outputqueue = new FramePriorityQueue(); + inputqueue = new FrameVectorQueue(); + block = new FrameVector(); + + num_frames_in_block = config->readString("frame_sequence")->length(); + fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr); threads = config->readInt("encoding_threads"); + // Thread stuff + sem_init(&in_sem, 0, 0); + sem_init(&out_sem, 0, 0); + sem_init(&read_sem, 0, 0); + + for(int cnt = 0; cnt < threads; cnt++) sem_post(&read_sem); + + pthread_mutex_init (&input_mutex, NULL); + pthread_mutex_init (&output_mutex, NULL); + + writer = new MovEncoderWriter(filename, outputqueue, &out_sem, &output_mutex); + writer_tid = new pthread_t; + pthread_create (writer_tid, NULL, thread_run, writer); + for(int cnt = 0; cnt < threads; cnt++) { - encs.push_back(new MovEncoder()); + encs.push_back(new MovEncoder(&read_sem, + inputqueue, &in_sem, &input_mutex, + outputqueue, &out_sem, &output_mutex)); tids.push_back(new pthread_t); pthread_create (tids[cnt], NULL, thread_run, encs[cnt]); } - current_encoder = 0; current_frame = 0; - - num_frames_in_block = config->readString("frame_sequence")->length(); - fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr); - - inputqueue = new Queue(); + frame_number = 0; } MovEncoderThread::~MovEncoderThread() { - - fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr); - - // Push any hanging frames. - encs[current_encoder]->encodeSequence(inputqueue); - - fprintf(stderr, "Clear - Readback\n"); fflush(stderr); - - /* - * Readback mode - */ - for(int cnt = 0; cnt <= current_encoder; cnt++) { - - // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); - - Queue *outputqueue = encs[cnt]->getResultSequence(); - Frame *f; - while((f = outputqueue->pop())) { - int i = write(file, f->data, f->size); - if(i == -1) perror("Write failed"); - - // fprintf(stderr, "wrote[%d]-", i); fflush(stderr); - - delete f; - } - } - - fprintf(stderr, "Clear - join threads\n"); fflush(stderr); + // These should not be deleted here... its done elsewhere. + inputqueue = NULL; + sem_post(&out_sem); + + // Stop the encoding threads. for(int cnt = 0; cnt < threads; cnt++) { encs[cnt]->running = false; - encs[cnt]->encodeSequence(NULL); + } + + // Kick them to initiate the exit. + for(int cnt = 0; cnt < threads; cnt++) { + sem_post(&in_sem); + } + + // They should be exited now, so we can delete them. + for(int cnt = 0; cnt < threads; cnt++) { pthread_join(*tids[cnt], NULL); delete encs[cnt]; + delete tids[cnt]; } + + // Tell the writer to stop + writer->running = false; - fprintf(stderr, "Clear - close file\n"); fflush(stderr); + // Kick it to make it stop. + sem_post(&out_sem); - if(file != -1) close(file); + // Destroy the thread + pthread_join(*writer_tid, NULL); + delete writer_tid; - fprintf(stderr, "Clear - done\n"); fflush(stderr); + // delete the writer (end thereby close the file) + delete writer; + // Destroy the semaphores. + sem_destroy(&in_sem); + sem_destroy(&out_sem); + sem_destroy(&read_sem); } void MovEncoderThread::encode(Frame* frame) { - if(file == -1) return; - - // fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr); - inputqueue->bpush(frame); - - /* - * Encode mode - */ - // Switch frame - current_frame++; - if(current_frame >= num_frames_in_block) { - - // fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr); + if(frame == NULL) { + fprintf(stderr, "NULL frame detected.\n"); + // Terminate + return; + } - encs[current_encoder]->encodeSequence(inputqueue); - inputqueue = new Queue(); + frame->number = frame_number; + block->push_back(frame); - // Switch encoder + // Switch frame + if(block->size() == num_frames_in_block) { + // Wait until a free encoder. + /* + int val; + sem_getvalue(&read_sem, &val); + fprintf(stderr, "Sem Value: %d\n", val); fflush(stderr); + */ + sem_wait(&read_sem); + + // Lock input mutex + pthread_mutex_lock(&input_mutex); + inputqueue->push(block); + pthread_mutex_unlock(&input_mutex); + // Unlock input mutex + + fprintf(stderr, "Frame vector [%d-%d] pushed\n", + block->at(0)->number, + block->at(block->size() - 1)->number); + fflush(stderr); + + // Kick encoders + sem_post(&in_sem); + + // Start new block current_frame = 0; - current_encoder++; - if(current_encoder >= threads) { - // switch mode - /* - * Readback mode - */ - for(int cnt = 0; cnt < threads; cnt++) { - - // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr); - - Queue *outputqueue = encs[cnt]->getResultSequence(); - Frame *f; - while((f = outputqueue->pop())) { - int i = write(file, f->data, f->size); - if(i == -1) perror("Write failed"); - - // fprintf(stderr, "wrote[%d]-", i); fflush(stderr); - - delete f; - } - } - current_encoder = 0; - } + block = new FrameVector; } + + frame_number ++; } diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h index 60f4c5c..989dd87 100644 --- a/src/mov_encoder_thread.h +++ b/src/mov_encoder_thread.h @@ -31,6 +31,9 @@ /* * $Log$ + * Revision 1.5 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.4 2005/05/19 14:10:22 deva * * Multithreading rulez? @@ -62,6 +65,7 @@ using namespace std; #include "mov_encoder.h" +#include "mov_encoder_writer.h" class MovEncoderThread { public: @@ -71,16 +75,29 @@ public: void encode(Frame* frame); private: - Queue *inputqueue; + FrameVectorQueue *inputqueue; + FramePriorityQueue *outputqueue; + FrameVector *block; + + //thread stuff + sem_t in_sem; + sem_t out_sem; + + sem_t read_sem; + + pthread_mutex_t input_mutex; + pthread_mutex_t output_mutex; // Used for encoder switching - int current_encoder; int current_frame; + unsigned int frame_number; int num_frames_in_block; + MovEncoderWriter *writer; + pthread_t* writer_tid; + int threads; - int file; vector encs; vector tids; }; diff --git a/src/mov_encoder_writer.cc b/src/mov_encoder_writer.cc new file mode 100644 index 0000000..941ef85 --- /dev/null +++ b/src/mov_encoder_writer.cc @@ -0,0 +1,117 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * mov_encoder_writer.cc + * + * Sun May 22 12:51:36 CEST 2005 + * Copyright 2005 Bent Bisballe + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ + +/* + * $Id$ + */ + +/* + * $Log$ + * Revision 1.1 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * + */ +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include "mov_encoder_writer.h" + +MovEncoderWriter::MovEncoderWriter(const char* filename, FramePriorityQueue *q, sem_t *s, pthread_mutex_t *m) +{ + file = open(filename, + O_CREAT | O_WRONLY, //| O_LARGEFILE + S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH); + if(file == -1) { + fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno)); + return; + } + + sem = s; + queue = q; + frame_number = 0; + mutex = m; + + running = true; +} + +MovEncoderWriter::~MovEncoderWriter() +{ + if(file != -1) close(file); +} + +void MovEncoderWriter::run() +{ + fprintf(stderr, "\t\t\t\t\t\t\t\t\tFile Writer ready!\n"); fflush(stderr); + + Frame *frame; + + if(file == -1) return; + + while(running) { + sem_wait(sem); + + if(queue->size() == 0) continue; + + pthread_mutex_lock(mutex); + frame = queue->top(); + if(frame->number == frame_number) queue->pop(); + pthread_mutex_unlock(mutex); + + /* + if(!frame) { + fprintf(stderr, "\t\t\t\t\t\t\t\t\tNULL frame detected"); + continue; + } + */ + + fprintf(stderr, "\t\t\t\t\t\t\t\t\tChecking frame [%d] against expected [%d]\n", + frame->number, frame_number); fflush(stderr); + while(frame->number == frame_number) { + + write(file, frame->data, frame->size); + delete frame; + + fprintf(stderr, "\t\t\t\t\t\t\t\t\tWrite frame [%d]\n", frame->number); fflush(stderr); + + frame_number++; + + pthread_mutex_lock(mutex); + frame = queue->top(); + if(frame->number == frame_number) queue->pop(); + pthread_mutex_unlock(mutex); + + } + } +} diff --git a/src/mov_encoder_writer.h b/src/mov_encoder_writer.h new file mode 100644 index 0000000..44647a0 --- /dev/null +++ b/src/mov_encoder_writer.h @@ -0,0 +1,67 @@ +/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */ +/*************************************************************************** + * mov_encoder_writer.h + * + * Sun May 22 12:51:35 CEST 2005 + * Copyright 2005 Bent Bisballe + * deva@aasimon.org + ****************************************************************************/ + +/* + * This file is part of MIaV. + * + * MIaV is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * MIaV is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with MIaV; if not, write to the Free Software + * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA. + */ + +/* + * $Id$ + */ + +/* + * $Log$ + * Revision 1.1 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * + */ + +#include +#ifndef __MIAV_MOV_ENCODER_WRITER_H__ +#define __MIAV_MOV_ENCODER_WRITER_H__ + +#include "frame.h" +#include "thread.h" + +class MovEncoderWriter : public Thread { +public: + MovEncoderWriter(const char* filename, FramePriorityQueue *q, sem_t *s, pthread_mutex_t *m); + ~MovEncoderWriter(); + + void run(); + + volatile bool running; + +private: + int file; + + FramePriorityQueue *queue; + pthread_mutex_t *mutex; + + sem_t *sem; + + unsigned int frame_number; +}; + + +#endif/*__MIAV_MOV_ENCODER_WRITER_H__*/ diff --git a/src/queue.h b/src/queue.h index 7c56e93..fa03c8e 100644 --- a/src/queue.h +++ b/src/queue.h @@ -38,6 +38,9 @@ /* * $Log$ + * Revision 1.17 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.16 2005/05/19 14:10:22 deva * * Multithreading rulez? @@ -86,7 +89,6 @@ public: ~Queue(); void push(T *t); - void bpush(T *t); T *pop(); T *peek(); @@ -173,47 +175,6 @@ void Queue::push(T *t) pthread_mutex_unlock(&mutex); } -/** - * Push element on queue from the bottom. - */ -template -void Queue::bpush(T *t) -{ - if(locked) { - delete t; - return; - } - - pthread_mutex_lock(&mutex); - - buf_t *b = (buf_t*)xmalloc(sizeof(*b)); - b->data = (void*)t; - - assert(b != NULL); - - if(limit && count > 0) { - T* tmp = (T*)_pop(); - delete tmp; - } - - if(!head) { - head = tail = b; - b->next = b->prev = NULL; - count = 1; - pthread_mutex_unlock(&mutex); - return; - } - - b->prev = head; - b->next = NULL; - if(head) - head->next = b; - head = b; - count++; - - pthread_mutex_unlock(&mutex); -} - /** * Pop element from queue. * If queue is empty, NULL is returned. diff --git a/src/server.cc b/src/server.cc index 135fb27..a79f463 100644 --- a/src/server.cc +++ b/src/server.cc @@ -31,6 +31,9 @@ /* * $Log$ + * Revision 1.18 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.17 2005/05/17 19:16:26 deva * * Made new mpeg writer work, with proper file permissions. @@ -344,7 +347,8 @@ void newConnection(Socket *socket) if(freeze_frame) delete freeze_frame; freeze_frame = frame; } else { - delete frame; + // Never delete the frames here! + //delete frame; } frame = new Frame(NULL, DVPACKAGE_SIZE); diff --git a/src/server_status.cc b/src/server_status.cc index ab155e4..23c6c3d 100644 --- a/src/server_status.cc +++ b/src/server_status.cc @@ -31,6 +31,9 @@ /* * $Log$ + * Revision 1.8 2005/05/22 15:49:22 deva + * Added multithreaded encoding support. + * * Revision 1.7 2005/05/17 15:12:51 deva * Fixed file rights (All read on files and directories, and all execute on directories). * @@ -77,6 +80,8 @@ ServerStatus::~ServerStatus() void ServerStatus::checkPoint() { + return; + static int frame = 0; frame++; if(frame % UPD != 0) return; -- cgit v1.2.3