summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordeva <deva>2005-05-22 15:49:22 +0000
committerdeva <deva>2005-05-22 15:49:22 +0000
commitb04122f3f7884de08eb5d59bb3fd2bff829f9039 (patch)
tree6898a061f4e10a6026faa1ffe4a7a319256dacdf
parentd74c7a00c417cffdc93a82efa2841e23d823bea6 (diff)
Added multithreaded encoding support.
-rw-r--r--configure.in6
-rw-r--r--etc/miav.conf2
-rw-r--r--src/Makefile.am47
-rw-r--r--src/frame.cc4
-rw-r--r--src/frame.h34
-rw-r--r--src/miav.cc80
-rw-r--r--src/mov_encoder.cc108
-rw-r--r--src/mov_encoder.h40
-rw-r--r--src/mov_encoder_thread.cc176
-rw-r--r--src/mov_encoder_thread.h23
-rw-r--r--src/mov_encoder_writer.cc117
-rw-r--r--src/mov_encoder_writer.h67
-rw-r--r--src/queue.h45
-rw-r--r--src/server.cc6
-rw-r--r--src/server_status.cc5
15 files changed, 531 insertions, 229 deletions
diff --git a/configure.in b/configure.in
index a0f8ce3..e6ff7ac 100644
--- a/configure.in
+++ b/configure.in
@@ -41,6 +41,12 @@ else
fi
dnl ======================
+dnl Check for pthread library
+dnl ======================
+AC_CHECK_HEADER(pthread.h, , AC_MSG_ERROR([*** pthread headers not found!]))
+AC_CHECK_LIB(pthread, sem_init, , AC_MSG_ERROR([*** libpthread not found!]))
+
+dnl ======================
dnl Check for dv library
dnl ======================
AC_CHECK_HEADER(libdv/dv.h, , AC_MSG_ERROR([*** libdv headers not found!]))
diff --git a/etc/miav.conf b/etc/miav.conf
index 476bbef..6eb33e1 100644
--- a/etc/miav.conf
+++ b/etc/miav.conf
@@ -34,4 +34,4 @@ frame_quality = 80
# The number of threads started for paralel encoding on the server
# (for multiprocessor systems)
-encoding_threads = 4
+encoding_threads = 2
diff --git a/src/Makefile.am b/src/Makefile.am
index 2bdb4dc..499e49b 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1,6 +1,3 @@
-#frekin' wierd
-## TODO: Move ffmpeg, libxml and libsdl into configure.in
-
AM_CXXFLAGS := $(CXXFLAGS) $(EXTRA_CXXFLAGS) -I../include $(QT_CXXFLAGS) \
-DQT_THREAD_SUPPORT \
-DPIXMAPS=\"$(datadir)/pixmaps\" \
@@ -11,59 +8,61 @@ bin_PROGRAMS = miav
miav_SOURCES = $(shell if [ $QT_CXXFLAGS ] ; then ../tools/MocList cc; fi ) \
aboutwindow.cc \
camera.cc \
+ cprquerydialog.cc \
decoder.cc \
+ dv1394.cc \
encoder.cc \
frame.cc \
+ img_encoder.cc \
+ info_console.cc \
+ info_gui.cc \
mainwindow.cc \
- cprquerydialog.cc \
+ messagebox.cc \
miav.cc \
+ miav_config.cc \
+ mov_encoder.cc \
+ mov_encoder_thread.cc \
+ mov_encoder_writer.cc \
network.cc \
player.cc \
+ server.cc \
+ server_status.cc \
socket.cc \
thread.cc \
util.cc \
- videowidget.cc \
- messagebox.cc \
- miav_config.cc \
- mov_encoder.cc \
- img_encoder.cc \
- server.cc \
- dv1394.cc \
- server_status.cc \
- info_gui.cc \
- info_console.cc \
- mov_encoder_thread.cc
+ videowidget.cc
EXTRA_DIST = \
aboutwindow.h \
camera.h \
cprquerydialog.h \
+ debug.h \
decoder.h \
+ dv.h \
+ dv1394.h \
encoder.h \
frame.h \
img_encoder.h \
+ info.h \
+ info_console.h \
+ info_gui.h \
mainwindow.h \
messagebox.h \
miav.h \
miav_config.h \
mov_encoder.h \
+ mov_encoder_thread.h \
+ mov_encoder_writer.h \
network.h \
package.h \
player.h \
queue.h \
server.h \
+ server_status.h \
socket.h \
thread.h \
util.h \
- videowidget.h \
- debug.h \
- dv.h \
- dv1394.h \
- server_status.h \
- info.h \
- info_gui.h \
- info_console.h \
- mov_encoder_thread.h
+ videowidget.h
miav_LDADD := $(shell if [ $QT_CXXFLAGS ] ; then ../tools/MocList o; fi )
diff --git a/src/frame.cc b/src/frame.cc
index 002ab43..ee5451d 100644
--- a/src/frame.cc
+++ b/src/frame.cc
@@ -31,6 +31,9 @@
/*
* $Log$
+ * Revision 1.7 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.6 2005/05/03 08:31:59 deva
* Removed the error object, and replaced it with a more generic info object.
*
@@ -51,6 +54,7 @@ Frame::Frame(unsigned char *d, int sz)
data = (unsigned char *)malloc(sz);
if(d) memcpy(data, d, sz);
size = sz;
+ number = 0;
}
Frame::~Frame()
diff --git a/src/frame.h b/src/frame.h
index aebe3cb..72cfe8e 100644
--- a/src/frame.h
+++ b/src/frame.h
@@ -31,6 +31,9 @@
/*
* $Log$
+ * Revision 1.6 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.5 2005/05/03 08:31:59 deva
* Removed the error object, and replaced it with a more generic info object.
*
@@ -45,17 +48,48 @@
#ifndef __FRAME_H__
#define __FRAME_H__
+// Definition of vector
+#include <vector>
+
+// Definition of priority_queue
+#include <queue>
+
class Frame {
public:
Frame(unsigned char *d, int sz);
~Frame();
+ /*
+ // Smaller frame number is higher priority
+ bool operator<(const Frame& f) const {
+ return number > f.number;
+ }
+ */
+
unsigned char *data;
int size;
+ unsigned int number;
+
bool shoot;
bool freeze;
bool record;
};
+#include <functional>
+
+template <typename T>
+struct frame_priority : std::binary_function<T, T, bool> {
+ bool operator() (const T& a, const T& b) const {
+ return ((Frame*)a)->number > ((Frame*)b)->number;
+ }
+};
+
+// Additional helper types.
+typedef std::vector< Frame* > FrameVector;
+typedef std::queue< FrameVector* > FrameVectorQueue;
+typedef std::priority_queue< Frame*,
+ std::vector<Frame*>,
+ frame_priority<Frame*> > FramePriorityQueue;
+
#endif/*__FRAME_H__*/
diff --git a/src/miav.cc b/src/miav.cc
index ba30a17..c3f08fe 100644
--- a/src/miav.cc
+++ b/src/miav.cc
@@ -31,6 +31,9 @@
/*
* $Log$
+ * Revision 1.10 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.9 2005/05/03 17:13:25 deva
* Fixed some missong Info object references.
*
@@ -92,6 +95,83 @@ int grab(int argc, char *argv[]) {
}
+#if 0
+/**
+ * Peters DAEMON code
+ */
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <unistd.h>
+
+int rundaemon()
+{
+ int pipes[2];
+
+ int f;
+
+ pipe(pipes);
+
+ f = fork();
+ switch(f) {
+ case -1: // error
+ fprintf(stderr, "Error, could not fork()!\n");
+ exit(0);
+ break;
+ case 0: // child
+ return communicationCtl(pipes[0]);
+ break;
+ default: // parent
+ signal(SIGCHLD, reportAndExit);
+
+ return serialportCtl(pipes[1]);
+ break;
+ }
+
+ return 0;
+}
+
+void daemon() {
+ int f;
+ int fd;
+
+ chdir("/");
+ umask(0);
+
+ f = fork();
+ switch(f) {
+ case -1:
+ fprintf(stderr, "fork() error!\n");
+ return 0;
+ break;
+ case 0:
+ if( (fp = fopen("/tmp/termo.out", "w")) == NULL) {
+ fprintf(stderr, "Outfile open error!\n");
+ exit(0);
+ }
+ fd = open("/dev/null", O_NOCTTY | O_RDWR, 0666);
+ dup2(0, fd);
+ dup2(1, fd);
+ dup2(2, fd);
+ setsid();
+ signal (SIGTERM, SIG_IGN);
+ signal (SIGINT, SIG_IGN);
+ signal (SIGHUP, SIG_IGN);
+ serialfd = initSerialPort(INDEVICE);
+ if(setgid(NOBODY_GROUP) != 0) {fprintf(fp, "GRP ch ERR\n");return 1;}
+ if(setuid(NOBODY_USER) != 0) {fprintf(fp, "USER ch ERR\n");return 1;}
+
+ return rundaemon();
+ break;
+ default:
+ exit(0);
+ }
+ return 0;
+}
+/**
+ * End og Peters DAEMON code
+ */
+#endif
+
/**
* This function starts the MIaV server.
*/
diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc
index 7bffc50..0a478fc 100644
--- a/src/mov_encoder.cc
+++ b/src/mov_encoder.cc
@@ -39,28 +39,26 @@
/*
* $Log$
- * Revision 1.23 2005/05/19 14:10:22 deva
+ * Revision 1.24 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
*
+ * Revision 1.23 2005/05/19 14:10:22 deva
* Multithreading rulez?
*
* Revision 1.22 2005/05/17 19:16:26 deva
- *
* Made new mpeg writer work, with proper file permissions.
*
* Revision 1.21 2005/05/17 14:30:56 deva
* Added code, preparing threaded encoding.
*
* Revision 1.20 2005/05/16 16:00:57 deva
- *
* Lots of stuff!
*
* Revision 1.19 2005/05/16 13:25:52 deva
- *
* Moved video setting to configuration file.
* Fine tuned setting for 2.4ghz server
*
* Revision 1.18 2005/05/16 11:13:24 deva
- *
* Optimized some encoding parameters.
*
* Revision 1.17 2005/05/16 10:45:10 deva
@@ -102,10 +100,10 @@
#include "miav_config.h"
#include "debug.h"
-//av_alloc_format_context
-//av_destruct_packet_nofree
-MovEncoder::MovEncoder()
+MovEncoder::MovEncoder(sem_t *r_sem,
+ FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex,
+ FramePriorityQueue *out, sem_t *out_sem, pthread_mutex_t *out_mutex)
{
// FIXME: Hmmm... should this be detected somewhere?!
int w = 720;
@@ -212,12 +210,18 @@ MovEncoder::MovEncoder()
fame_init(fame_context, &fame_par, fame_buffer, FAME_BUFFER_SIZE);
+ running = true;
- // Thread stuff
- sem_init(&sem, 0, 0);
- sem_init(&done, 0, 0);
+ inputqueue = in;
+ outputqueue = out;
- running = true;
+ input_sem = in_sem;
+ output_sem = out_sem;
+
+ read_sem = r_sem;
+
+ input_mutex = in_mutex;
+ output_mutex = out_mutex;
}
MovEncoder::~MovEncoder()
@@ -265,7 +269,7 @@ Frame *MovEncoder::encode_video(Frame *dvframe)
dvdecoder->num_dif_seqs = 12;
}
- pixels[ 0 ] = rgb; // We use this as the output buffer
+ pixels[ 0 ] = picture; // We use this as the output buffer
pitches[ 0 ] = w * 2;
dv_decode_full_frame(dvdecoder,
@@ -279,7 +283,7 @@ Frame *MovEncoder::encode_video(Frame *dvframe)
uint8_t *y = yuv.y;
uint8_t *cb = yuv.u;
uint8_t *cr = yuv.v;
- uint8_t *p = rgb;
+ uint8_t *p = picture;
for ( int i = 0; i < h; i += 2 ) {
// process two scanlines (one from each field, interleaved)
@@ -326,59 +330,51 @@ void MovEncoder::encode_audio(Frame *dvframe)
// TODO: Do some audio stuff here sometime!
}
-void MovEncoder::encodeSequence(Queue<Frame> *queue)
-{
- // set input queue
- inputqueue = queue;
-
- // unlock semaphore
- sem_post(&sem);
-}
-
-
// this runs in a thread
void MovEncoder::run()
{
- fprintf(stderr, "Encoder Ready\n"); fflush(stderr);
+ FrameVector *item;
+ Frame *in_frame;
+ Frame *out_frame;
+
+ fprintf(stderr, "\t\t\t\tEncoder Ready\n"); fflush(stderr);
while(running) {
- // wait for semaphore
- // lock semaphore
- sem_wait(&sem);
- if(inputqueue == NULL) continue;
+ sem_wait(input_sem);
- fprintf(stderr, "."); fflush(stderr);
+ fprintf(stderr, "\t\t\t\tReading block\n"); fflush(stderr);
- // allocate new output queue
- outputqueue = new Queue<Frame>();
- // while input queue.pop
- Frame *fi, *fo;
- while(inputqueue->length() > 0) {
- fi = inputqueue->pop();
+ // Lock inout mutex
+ pthread_mutex_lock(input_mutex);
+ item = inputqueue->front();
+ inputqueue->pop();
+ pthread_mutex_unlock(input_mutex);
+ // Unlock input mutex
- // encode frame from input queue
- fo = encode(fi);
+ if(!item) {
+ fprintf(stderr, "\t\t\t\tEmpty block detected.\n"); fflush(stderr);
+ continue;
+ }
- // and push result on output queue
- outputqueue->push(fo);
+ for(int cnt = 0; cnt < item->size(); cnt++) {
+ in_frame = item->at(cnt);
+ out_frame = encode(in_frame);
+ out_frame->number = in_frame->number;
- // delete the input frame
- //delete fi;
- }
- // delete input queue
- delete inputqueue;
- sem_post(&done);
- }
-}
+ delete in_frame;
-Queue<Frame> *MovEncoder::getResultSequence()
-{
- // wait for sempahore
- sem_wait(&done);
+ // Lock output mutex
+ pthread_mutex_lock(output_mutex);
+ outputqueue->push(out_frame);
+ pthread_mutex_unlock(output_mutex);
+ // Unlock output mutex
+
+ fprintf(stderr, "\t\t\t\tEncoded [%d] - pushed it for writing\n", in_frame->number); fflush(stderr);
+ }
- // fprintf(stderr, "POP!\n"); fflush(stderr);
+ delete item;
- // return output queue
- return outputqueue;
+ sem_post(read_sem);
+ sem_post(output_sem);
+ }
}
-
diff --git a/src/mov_encoder.h b/src/mov_encoder.h
index 13eb63b..d7e0c17 100644
--- a/src/mov_encoder.h
+++ b/src/mov_encoder.h
@@ -36,19 +36,19 @@
/*
* $Log$
- * Revision 1.9 2005/05/19 14:10:22 deva
+ * Revision 1.10 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
*
+ * Revision 1.9 2005/05/19 14:10:22 deva
* Multithreading rulez?
*
* Revision 1.8 2005/05/17 14:30:56 deva
* Added code, preparing threaded encoding.
*
* Revision 1.7 2005/05/09 16:40:20 deva
- *
* Added optimize yuv conversion code
*
* Revision 1.6 2005/05/05 20:41:38 deva
- *
* Removed the last pieces of ffmpeg... replaced it with libfame...
* Not quite working yet, but all the major code is in place!
*
@@ -66,6 +66,7 @@
#include <stdlib.h>
#include <string.h>
+
// Use libfame
#include <fame.h>
@@ -73,52 +74,57 @@
#include <libdv/dv.h>
#include <libdv/dv_types.h>
+#include <vector>
+using namespace std;
+
#include "frame.h"
-#include "queue.h"
#include "util.h"
#include "thread.h"
+#include <pthread.h>
// size specifies the length of the buffer.
#define FAME_BUFFER_SIZE (2*720*576*4) // FIXME: One size fits all...
class MovEncoder : public Thread {
public:
- MovEncoder();
+ MovEncoder(sem_t *r_sem,
+ FrameVectorQueue *in, sem_t *in_sem, pthread_mutex_t *in_mutex,
+ FramePriorityQueue *out, sem_t *out_sem, pthread_mutex_t *out_mutex);
~MovEncoder();
Frame* encode(Frame *frame);
void run();
- Queue<Frame> *getResultSequence();
- void encodeSequence(Queue<Frame> *queue);
-
volatile bool running;
private:
- // Input queue
- Queue<Frame> *inputqueue;
- Queue<Frame> *outputqueue;
+ // Input/Output queues
+ FrameVectorQueue *inputqueue;
+ FramePriorityQueue *outputqueue;
//thread stuff
- sem_t sem;
- sem_t done;
+ sem_t *input_sem;
+ sem_t *output_sem;
+
+ sem_t *read_sem;
+ pthread_mutex_t *input_mutex;
+ pthread_mutex_t *output_mutex;
+
Frame *encode_video(Frame *frame);
void encode_audio(Frame *frame);
- // buffer is the buffer where encoded data will be written to. It must be large
- // enough to contain a few frames.
+ // libFAME encoder
unsigned char *fame_buffer;
fame_parameters_t fame_par;
fame_context_t *fame_context;
fame_yuv_t yuv;
- FILE *f;
// libdv decoder
dv_decoder_t *dvdecoder;
- unsigned char rgb[FAME_BUFFER_SIZE];
+ unsigned char picture[FAME_BUFFER_SIZE];
};
#endif
diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc
index ed71a31..d95961d 100644
--- a/src/mov_encoder_thread.cc
+++ b/src/mov_encoder_thread.cc
@@ -31,6 +31,9 @@
/*
* $Log$
+ * Revision 1.7 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.6 2005/05/19 14:10:22 deva
*
* Multithreading rulez?
@@ -58,120 +61,123 @@
MovEncoderThread::MovEncoderThread(const char *filename)
{
- file = open(filename,
- O_CREAT | O_WRONLY, //| O_LARGEFILE
- S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
- if(file == -1) {
- fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno));
- return;
- }
+ outputqueue = new FramePriorityQueue();
+ inputqueue = new FrameVectorQueue();
+ block = new FrameVector();
+
+ num_frames_in_block = config->readString("frame_sequence")->length();
+ fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr);
threads = config->readInt("encoding_threads");
+ // Thread stuff
+ sem_init(&in_sem, 0, 0);
+ sem_init(&out_sem, 0, 0);
+ sem_init(&read_sem, 0, 0);
+
+ for(int cnt = 0; cnt < threads; cnt++) sem_post(&read_sem);
+
+ pthread_mutex_init (&input_mutex, NULL);
+ pthread_mutex_init (&output_mutex, NULL);
+
+ writer = new MovEncoderWriter(filename, outputqueue, &out_sem, &output_mutex);
+ writer_tid = new pthread_t;
+ pthread_create (writer_tid, NULL, thread_run, writer);
+
for(int cnt = 0; cnt < threads; cnt++) {
- encs.push_back(new MovEncoder());
+ encs.push_back(new MovEncoder(&read_sem,
+ inputqueue, &in_sem, &input_mutex,
+ outputqueue, &out_sem, &output_mutex));
tids.push_back(new pthread_t);
pthread_create (tids[cnt], NULL, thread_run, encs[cnt]);
}
- current_encoder = 0;
current_frame = 0;
-
- num_frames_in_block = config->readString("frame_sequence")->length();
- fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr);
-
- inputqueue = new Queue<Frame>();
+ frame_number = 0;
}
MovEncoderThread::~MovEncoderThread()
{
-
- fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr);
-
- // Push any hanging frames.
- encs[current_encoder]->encodeSequence(inputqueue);
-
- fprintf(stderr, "Clear - Readback\n"); fflush(stderr);
-
- /*
- * Readback mode
- */
- for(int cnt = 0; cnt <= current_encoder; cnt++) {
-
- // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr);
-
- Queue<Frame> *outputqueue = encs[cnt]->getResultSequence();
- Frame *f;
- while((f = outputqueue->pop())) {
- int i = write(file, f->data, f->size);
- if(i == -1) perror("Write failed");
-
- // fprintf(stderr, "wrote[%d]-", i); fflush(stderr);
-
- delete f;
- }
- }
-
- fprintf(stderr, "Clear - join threads\n"); fflush(stderr);
+ // These should not be deleted here... its done elsewhere.
+ inputqueue = NULL;
+ sem_post(&out_sem);
+
+ // Stop the encoding threads.
for(int cnt = 0; cnt < threads; cnt++) {
encs[cnt]->running = false;
- encs[cnt]->encodeSequence(NULL);
+ }
+
+ // Kick them to initiate the exit.
+ for(int cnt = 0; cnt < threads; cnt++) {
+ sem_post(&in_sem);
+ }
+
+ // They should be exited now, so we can delete them.
+ for(int cnt = 0; cnt < threads; cnt++) {
pthread_join(*tids[cnt], NULL);
delete encs[cnt];
+ delete tids[cnt];
}
+
+ // Tell the writer to stop
+ writer->running = false;
- fprintf(stderr, "Clear - close file\n"); fflush(stderr);
+ // Kick it to make it stop.
+ sem_post(&out_sem);
- if(file != -1) close(file);
+ // Destroy the thread
+ pthread_join(*writer_tid, NULL);
+ delete writer_tid;
- fprintf(stderr, "Clear - done\n"); fflush(stderr);
+ // delete the writer (end thereby close the file)
+ delete writer;
+ // Destroy the semaphores.
+ sem_destroy(&in_sem);
+ sem_destroy(&out_sem);
+ sem_destroy(&read_sem);
}
void MovEncoderThread::encode(Frame* frame)
{
- if(file == -1) return;
-
- // fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr);
- inputqueue->bpush(frame);
-
- /*
- * Encode mode
- */
- // Switch frame
- current_frame++;
- if(current_frame >= num_frames_in_block) {
-
- // fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr);
+ if(frame == NULL) {
+ fprintf(stderr, "NULL frame detected.\n");
+ // Terminate
+ return;
+ }
- encs[current_encoder]->encodeSequence(inputqueue);
- inputqueue = new Queue<Frame>();
+ frame->number = frame_number;
+ block->push_back(frame);
- // Switch encoder
+ // Switch frame
+ if(block->size() == num_frames_in_block) {
+ // Wait until a free encoder.
+ /*
+ int val;
+ sem_getvalue(&read_sem, &val);
+ fprintf(stderr, "Sem Value: %d\n", val); fflush(stderr);
+ */
+ sem_wait(&read_sem);
+
+ // Lock input mutex
+ pthread_mutex_lock(&input_mutex);
+ inputqueue->push(block);
+ pthread_mutex_unlock(&input_mutex);
+ // Unlock input mutex
+
+ fprintf(stderr, "Frame vector [%d-%d] pushed\n",
+ block->at(0)->number,
+ block->at(block->size() - 1)->number);
+ fflush(stderr);
+
+ // Kick encoders
+ sem_post(&in_sem);
+
+ // Start new block
current_frame = 0;
- current_encoder++;
- if(current_encoder >= threads) {
- // switch mode
- /*
- * Readback mode
- */
- for(int cnt = 0; cnt < threads; cnt++) {
-
- // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr);
-
- Queue<Frame> *outputqueue = encs[cnt]->getResultSequence();
- Frame *f;
- while((f = outputqueue->pop())) {
- int i = write(file, f->data, f->size);
- if(i == -1) perror("Write failed");
-
- // fprintf(stderr, "wrote[%d]-", i); fflush(stderr);
-
- delete f;
- }
- }
- current_encoder = 0;
- }
+ block = new FrameVector;
}
+
+ frame_number ++;
}
diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h
index 60f4c5c..989dd87 100644
--- a/src/mov_encoder_thread.h
+++ b/src/mov_encoder_thread.h
@@ -31,6 +31,9 @@
/*
* $Log$
+ * Revision 1.5 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.4 2005/05/19 14:10:22 deva
*
* Multithreading rulez?
@@ -62,6 +65,7 @@
using namespace std;
#include "mov_encoder.h"
+#include "mov_encoder_writer.h"
class MovEncoderThread {
public:
@@ -71,16 +75,29 @@ public:
void encode(Frame* frame);
private:
- Queue<Frame> *inputqueue;
+ FrameVectorQueue *inputqueue;
+ FramePriorityQueue *outputqueue;
+ FrameVector *block;
+
+ //thread stuff
+ sem_t in_sem;
+ sem_t out_sem;
+
+ sem_t read_sem;
+
+ pthread_mutex_t input_mutex;
+ pthread_mutex_t output_mutex;
// Used for encoder switching
- int current_encoder;
int current_frame;
+ unsigned int frame_number;
int num_frames_in_block;
+ MovEncoderWriter *writer;
+ pthread_t* writer_tid;
+
int threads;
- int file;
vector<MovEncoder*> encs;
vector<pthread_t*> tids;
};
diff --git a/src/mov_encoder_writer.cc b/src/mov_encoder_writer.cc
new file mode 100644
index 0000000..941ef85
--- /dev/null
+++ b/src/mov_encoder_writer.cc
@@ -0,0 +1,117 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * mov_encoder_writer.cc
+ *
+ * Sun May 22 12:51:36 CEST 2005
+ * Copyright 2005 Bent Bisballe
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+
+/*
+ * $Id$
+ */
+
+/*
+ * $Log$
+ * Revision 1.1 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
+ */
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+#include <unistd.h>
+
+#include <pthread.h>
+#include <semaphore.h>
+
+#include <errno.h>
+
+#include <config.h>
+#include "mov_encoder_writer.h"
+
+MovEncoderWriter::MovEncoderWriter(const char* filename, FramePriorityQueue *q, sem_t *s, pthread_mutex_t *m)
+{
+ file = open(filename,
+ O_CREAT | O_WRONLY, //| O_LARGEFILE
+ S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH);
+ if(file == -1) {
+ fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno));
+ return;
+ }
+
+ sem = s;
+ queue = q;
+ frame_number = 0;
+ mutex = m;
+
+ running = true;
+}
+
+MovEncoderWriter::~MovEncoderWriter()
+{
+ if(file != -1) close(file);
+}
+
+void MovEncoderWriter::run()
+{
+ fprintf(stderr, "\t\t\t\t\t\t\t\t\tFile Writer ready!\n"); fflush(stderr);
+
+ Frame *frame;
+
+ if(file == -1) return;
+
+ while(running) {
+ sem_wait(sem);
+
+ if(queue->size() == 0) continue;
+
+ pthread_mutex_lock(mutex);
+ frame = queue->top();
+ if(frame->number == frame_number) queue->pop();
+ pthread_mutex_unlock(mutex);
+
+ /*
+ if(!frame) {
+ fprintf(stderr, "\t\t\t\t\t\t\t\t\tNULL frame detected");
+ continue;
+ }
+ */
+
+ fprintf(stderr, "\t\t\t\t\t\t\t\t\tChecking frame [%d] against expected [%d]\n",
+ frame->number, frame_number); fflush(stderr);
+ while(frame->number == frame_number) {
+
+ write(file, frame->data, frame->size);
+ delete frame;
+
+ fprintf(stderr, "\t\t\t\t\t\t\t\t\tWrite frame [%d]\n", frame->number); fflush(stderr);
+
+ frame_number++;
+
+ pthread_mutex_lock(mutex);
+ frame = queue->top();
+ if(frame->number == frame_number) queue->pop();
+ pthread_mutex_unlock(mutex);
+
+ }
+ }
+}
diff --git a/src/mov_encoder_writer.h b/src/mov_encoder_writer.h
new file mode 100644
index 0000000..44647a0
--- /dev/null
+++ b/src/mov_encoder_writer.h
@@ -0,0 +1,67 @@
+/* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
+/***************************************************************************
+ * mov_encoder_writer.h
+ *
+ * Sun May 22 12:51:35 CEST 2005
+ * Copyright 2005 Bent Bisballe
+ * deva@aasimon.org
+ ****************************************************************************/
+
+/*
+ * This file is part of MIaV.
+ *
+ * MIaV is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * MIaV is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with MIaV; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA.
+ */
+
+/*
+ * $Id$
+ */
+
+/*
+ * $Log$
+ * Revision 1.1 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
+ */
+
+#include <config.h>
+#ifndef __MIAV_MOV_ENCODER_WRITER_H__
+#define __MIAV_MOV_ENCODER_WRITER_H__
+
+#include "frame.h"
+#include "thread.h"
+
+class MovEncoderWriter : public Thread {
+public:
+ MovEncoderWriter(const char* filename, FramePriorityQueue *q, sem_t *s, pthread_mutex_t *m);
+ ~MovEncoderWriter();
+
+ void run();
+
+ volatile bool running;
+
+private:
+ int file;
+
+ FramePriorityQueue *queue;
+ pthread_mutex_t *mutex;
+
+ sem_t *sem;
+
+ unsigned int frame_number;
+};
+
+
+#endif/*__MIAV_MOV_ENCODER_WRITER_H__*/
diff --git a/src/queue.h b/src/queue.h
index 7c56e93..fa03c8e 100644
--- a/src/queue.h
+++ b/src/queue.h
@@ -38,6 +38,9 @@
/*
* $Log$
+ * Revision 1.17 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.16 2005/05/19 14:10:22 deva
*
* Multithreading rulez?
@@ -86,7 +89,6 @@ public:
~Queue();
void push(T *t);
- void bpush(T *t);
T *pop();
T *peek();
@@ -174,47 +176,6 @@ void Queue<T>::push(T *t)
}
/**
- * Push element on queue from the bottom.
- */
-template<typename T>
-void Queue<T>::bpush(T *t)
-{
- if(locked) {
- delete t;
- return;
- }
-
- pthread_mutex_lock(&mutex);
-
- buf_t *b = (buf_t*)xmalloc(sizeof(*b));
- b->data = (void*)t;
-
- assert(b != NULL);
-
- if(limit && count > 0) {
- T* tmp = (T*)_pop();
- delete tmp;
- }
-
- if(!head) {
- head = tail = b;
- b->next = b->prev = NULL;
- count = 1;
- pthread_mutex_unlock(&mutex);
- return;
- }
-
- b->prev = head;
- b->next = NULL;
- if(head)
- head->next = b;
- head = b;
- count++;
-
- pthread_mutex_unlock(&mutex);
-}
-
-/**
* Pop element from queue.
* If queue is empty, NULL is returned.
*/
diff --git a/src/server.cc b/src/server.cc
index 135fb27..a79f463 100644
--- a/src/server.cc
+++ b/src/server.cc
@@ -31,6 +31,9 @@
/*
* $Log$
+ * Revision 1.18 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.17 2005/05/17 19:16:26 deva
*
* Made new mpeg writer work, with proper file permissions.
@@ -344,7 +347,8 @@ void newConnection(Socket *socket)
if(freeze_frame) delete freeze_frame;
freeze_frame = frame;
} else {
- delete frame;
+ // Never delete the frames here!
+ //delete frame;
}
frame = new Frame(NULL, DVPACKAGE_SIZE);
diff --git a/src/server_status.cc b/src/server_status.cc
index ab155e4..23c6c3d 100644
--- a/src/server_status.cc
+++ b/src/server_status.cc
@@ -31,6 +31,9 @@
/*
* $Log$
+ * Revision 1.8 2005/05/22 15:49:22 deva
+ * Added multithreaded encoding support.
+ *
* Revision 1.7 2005/05/17 15:12:51 deva
* Fixed file rights (All read on files and directories, and all execute on directories).
*
@@ -77,6 +80,8 @@ ServerStatus::~ServerStatus()
void ServerStatus::checkPoint()
{
+ return;
+
static int frame = 0;
frame++;
if(frame % UPD != 0) return;