summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordeva <deva>2005-05-19 14:10:22 +0000
committerdeva <deva>2005-05-19 14:10:22 +0000
commitd74c7a00c417cffdc93a82efa2841e23d823bea6 (patch)
treeea7f7b69ccbd0dc1df1ea5e05dd59cfafa194f25
parenta597454b7ce1b931e3e4117e6fed509cc22517ff (diff)
Multithreading rulez?R0_2_2
-rw-r--r--TODO4
-rw-r--r--etc/miav.conf8
-rw-r--r--src/mov_encoder.cc72
-rw-r--r--src/mov_encoder.h29
-rw-r--r--src/mov_encoder_thread.cc96
-rw-r--r--src/mov_encoder_thread.h7
-rw-r--r--src/queue.h49
7 files changed, 241 insertions, 24 deletions
diff --git a/TODO b/TODO
index 0151357..d194079 100644
--- a/TODO
+++ b/TODO
@@ -13,7 +13,6 @@ http://www.linuxmanpages.com/man3/fame_start_frame.3.php
// YUV420 format specification
http://encyclopedia.laborlawtalk.com/YUV_4:2:0
-- file permissions
- multithreded encoding
- fopen med create unique
@@ -97,7 +96,8 @@ Main:
[x] - Use correct filenames and paths.
[x] - Check for writabilty before trying to do any writing.
[x] - Create fallback, when unable to write the requested filename.
- [ ] - Permissions on sesrver file writing to be read from config.
+ [x] - Permissions on server file writing. Files: u+wr g+r a+x - Directories: u+wrx g+rx a+rx.
+ [ ] - Permissions on server file writing to be read from config.
==========================================================================
TASKS (common)
diff --git a/etc/miav.conf b/etc/miav.conf
index a0692a2..476bbef 100644
--- a/etc/miav.conf
+++ b/etc/miav.conf
@@ -17,11 +17,11 @@ pixel_width = 1024
pixel_height = 768
# How and where to connect to the miav server?
-server_addr = "10.3.20.232"
+server_addr = "127.0.0.1"
server_port = 18120
# Where top store the files recieved by the server
-server_root = "/tmp/miav_files"
+server_root = "/home/miav/miav_files"
# Video output controls. A sequence of I and P, where I is keyframes
# which is fast to create, but uses a lot of discspace.
@@ -31,3 +31,7 @@ frame_sequence = "IIPIP"
# quality in % - 100% is best quality
frame_quality = 80
+
+# The number of threads started for paralel encoding on the server
+# (for multiprocessor systems)
+encoding_threads = 4
diff --git a/src/mov_encoder.cc b/src/mov_encoder.cc
index 172051d..7bffc50 100644
--- a/src/mov_encoder.cc
+++ b/src/mov_encoder.cc
@@ -39,6 +39,10 @@
/*
* $Log$
+ * Revision 1.23 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.22 2005/05/17 19:16:26 deva
*
* Made new mpeg writer work, with proper file permissions.
@@ -104,8 +108,8 @@
MovEncoder::MovEncoder()
{
// FIXME: Hmmm... should this be detected somewhere?!
- static int w = 720;
- static int h = 576;
+ int w = 720;
+ int h = 576;
// Initialize yuv strucutre.
yuv.w = w;
@@ -207,6 +211,13 @@ MovEncoder::MovEncoder()
fame_init(fame_context, &fame_par, fame_buffer, FAME_BUFFER_SIZE);
+
+
+ // Thread stuff
+ sem_init(&sem, 0, 0);
+ sem_init(&done, 0, 0);
+
+ running = true;
}
MovEncoder::~MovEncoder()
@@ -314,3 +325,60 @@ void MovEncoder::encode_audio(Frame *dvframe)
{
// TODO: Do some audio stuff here sometime!
}
+
+void MovEncoder::encodeSequence(Queue<Frame> *queue)
+{
+ // set input queue
+ inputqueue = queue;
+
+ // unlock semaphore
+ sem_post(&sem);
+}
+
+
+// this runs in a thread
+void MovEncoder::run()
+{
+ fprintf(stderr, "Encoder Ready\n"); fflush(stderr);
+
+ while(running) {
+ // wait for semaphore
+ // lock semaphore
+ sem_wait(&sem);
+ if(inputqueue == NULL) continue;
+
+ fprintf(stderr, "."); fflush(stderr);
+
+ // allocate new output queue
+ outputqueue = new Queue<Frame>();
+ // while input queue.pop
+ Frame *fi, *fo;
+ while(inputqueue->length() > 0) {
+ fi = inputqueue->pop();
+
+ // encode frame from input queue
+ fo = encode(fi);
+
+ // and push result on output queue
+ outputqueue->push(fo);
+
+ // delete the input frame
+ //delete fi;
+ }
+ // delete input queue
+ delete inputqueue;
+ sem_post(&done);
+ }
+}
+
+Queue<Frame> *MovEncoder::getResultSequence()
+{
+ // wait for sempahore
+ sem_wait(&done);
+
+ // fprintf(stderr, "POP!\n"); fflush(stderr);
+
+ // return output queue
+ return outputqueue;
+}
+
diff --git a/src/mov_encoder.h b/src/mov_encoder.h
index 24525f2..13eb63b 100644
--- a/src/mov_encoder.h
+++ b/src/mov_encoder.h
@@ -36,6 +36,10 @@
/*
* $Log$
+ * Revision 1.9 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.8 2005/05/17 14:30:56 deva
* Added code, preparing threaded encoding.
*
@@ -70,19 +74,36 @@
#include <libdv/dv_types.h>
#include "frame.h"
-
+#include "queue.h"
#include "util.h"
+#include "thread.h"
+
// size specifies the length of the buffer.
#define FAME_BUFFER_SIZE (2*720*576*4) // FIXME: One size fits all...
-class MovEncoder {
- public:
+class MovEncoder : public Thread {
+public:
MovEncoder();
~MovEncoder();
Frame* encode(Frame *frame);
- private:
+ void run();
+
+ Queue<Frame> *getResultSequence();
+ void encodeSequence(Queue<Frame> *queue);
+
+ volatile bool running;
+
+private:
+ // Input queue
+ Queue<Frame> *inputqueue;
+ Queue<Frame> *outputqueue;
+
+ //thread stuff
+ sem_t sem;
+ sem_t done;
+
Frame *encode_video(Frame *frame);
void encode_audio(Frame *frame);
diff --git a/src/mov_encoder_thread.cc b/src/mov_encoder_thread.cc
index be86377..ed71a31 100644
--- a/src/mov_encoder_thread.cc
+++ b/src/mov_encoder_thread.cc
@@ -31,6 +31,10 @@
/*
* $Log$
+ * Revision 1.6 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.5 2005/05/19 10:55:49 deva
* Test for block encoding of length strlen("IPIPP").
*
@@ -61,43 +65,113 @@ MovEncoderThread::MovEncoderThread(const char *filename)
fprintf(stderr, "Could not open file for writing: %s\n", strerror(errno));
return;
}
- threads = 4;
+
+ threads = config->readInt("encoding_threads");
for(int cnt = 0; cnt < threads; cnt++) {
encs.push_back(new MovEncoder());
+ tids.push_back(new pthread_t);
+ pthread_create (tids[cnt], NULL, thread_run, encs[cnt]);
}
- int current_encoder = 0;
- int current_frame = 0;
+ current_encoder = 0;
+ current_frame = 0;
- int num_frames_in_block = config->readString("frame_sequence")->length();
+ num_frames_in_block = config->readString("frame_sequence")->length();
fprintf(stderr, "Frame sequence length [%d]\n", num_frames_in_block); fflush(stderr);
+
+ inputqueue = new Queue<Frame>();
}
MovEncoderThread::~MovEncoderThread()
{
+
+ fprintf(stderr, "Clear - encode last[%d]\n", current_encoder); fflush(stderr);
+
+ // Push any hanging frames.
+ encs[current_encoder]->encodeSequence(inputqueue);
+
+ fprintf(stderr, "Clear - Readback\n"); fflush(stderr);
+
+ /*
+ * Readback mode
+ */
+ for(int cnt = 0; cnt <= current_encoder; cnt++) {
+
+ // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr);
+
+ Queue<Frame> *outputqueue = encs[cnt]->getResultSequence();
+ Frame *f;
+ while((f = outputqueue->pop())) {
+ int i = write(file, f->data, f->size);
+ if(i == -1) perror("Write failed");
+
+ // fprintf(stderr, "wrote[%d]-", i); fflush(stderr);
+
+ delete f;
+ }
+ }
+
+ fprintf(stderr, "Clear - join threads\n"); fflush(stderr);
+
for(int cnt = 0; cnt < threads; cnt++) {
+ encs[cnt]->running = false;
+ encs[cnt]->encodeSequence(NULL);
+ pthread_join(*tids[cnt], NULL);
delete encs[cnt];
}
+
+ fprintf(stderr, "Clear - close file\n"); fflush(stderr);
+
if(file != -1) close(file);
+
+ fprintf(stderr, "Clear - done\n"); fflush(stderr);
+
}
void MovEncoderThread::encode(Frame* frame)
{
if(file == -1) return;
- Frame *enc_frame = encs[current_encoder]->encode(frame);
- // fprintf(stderr, "[%d]", enc_frame->size); fflush(stderr);
- int i = write(file, enc_frame->data, enc_frame->size);
- if(i == -1) perror("Write failed");
- delete enc_frame;
-
+ // fprintf(stderr, "build[%d]-", current_encoder); fflush(stderr);
+ inputqueue->bpush(frame);
+
+ /*
+ * Encode mode
+ */
// Switch frame
current_frame++;
if(current_frame >= num_frames_in_block) {
+
+ // fprintf(stderr, "push[%d]-", current_encoder); fflush(stderr);
+
+ encs[current_encoder]->encodeSequence(inputqueue);
+ inputqueue = new Queue<Frame>();
+
// Switch encoder
current_frame = 0;
current_encoder++;
- if(current_encoder >= threads) current_encoder = 0;
+ if(current_encoder >= threads) {
+ // switch mode
+ /*
+ * Readback mode
+ */
+ for(int cnt = 0; cnt < threads; cnt++) {
+
+ // fprintf(stderr, "pop[%d]-", cnt); fflush(stderr);
+
+ Queue<Frame> *outputqueue = encs[cnt]->getResultSequence();
+ Frame *f;
+ while((f = outputqueue->pop())) {
+ int i = write(file, f->data, f->size);
+ if(i == -1) perror("Write failed");
+
+ // fprintf(stderr, "wrote[%d]-", i); fflush(stderr);
+
+ delete f;
+ }
+ }
+ current_encoder = 0;
+ }
}
}
diff --git a/src/mov_encoder_thread.h b/src/mov_encoder_thread.h
index 1e91b58..60f4c5c 100644
--- a/src/mov_encoder_thread.h
+++ b/src/mov_encoder_thread.h
@@ -31,6 +31,10 @@
/*
* $Log$
+ * Revision 1.4 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.3 2005/05/19 10:55:49 deva
* Test for block encoding of length strlen("IPIPP").
*
@@ -67,6 +71,8 @@ public:
void encode(Frame* frame);
private:
+ Queue<Frame> *inputqueue;
+
// Used for encoder switching
int current_encoder;
int current_frame;
@@ -76,6 +82,7 @@ private:
int threads;
int file;
vector<MovEncoder*> encs;
+ vector<pthread_t*> tids;
};
#endif/*__MIAV_MOV_ENCODER_THREAD_H__*/
diff --git a/src/queue.h b/src/queue.h
index de7b8ff..7c56e93 100644
--- a/src/queue.h
+++ b/src/queue.h
@@ -38,6 +38,10 @@
/*
* $Log$
+ * Revision 1.16 2005/05/19 14:10:22 deva
+ *
+ * Multithreading rulez?
+ *
* Revision 1.15 2005/05/16 16:00:57 deva
*
* Lots of stuff!
@@ -82,6 +86,7 @@ public:
~Queue();
void push(T *t);
+ void bpush(T *t);
T *pop();
T *peek();
@@ -127,9 +132,6 @@ Queue<T>::~Queue()
pthread_mutex_destroy(&mutex);
}
-
-
-
/**
* Push element on queue.
*/
@@ -172,6 +174,47 @@ void Queue<T>::push(T *t)
}
/**
+ * Push element on queue from the bottom.
+ */
+template<typename T>
+void Queue<T>::bpush(T *t)
+{
+ if(locked) {
+ delete t;
+ return;
+ }
+
+ pthread_mutex_lock(&mutex);
+
+ buf_t *b = (buf_t*)xmalloc(sizeof(*b));
+ b->data = (void*)t;
+
+ assert(b != NULL);
+
+ if(limit && count > 0) {
+ T* tmp = (T*)_pop();
+ delete tmp;
+ }
+
+ if(!head) {
+ head = tail = b;
+ b->next = b->prev = NULL;
+ count = 1;
+ pthread_mutex_unlock(&mutex);
+ return;
+ }
+
+ b->prev = head;
+ b->next = NULL;
+ if(head)
+ head->next = b;
+ head = b;
+ count++;
+
+ pthread_mutex_unlock(&mutex);
+}
+
+/**
* Pop element from queue.
* If queue is empty, NULL is returned.
*/