From ba2094c0c735efc2677ab8c24bdf70ab70874a30 Mon Sep 17 00:00:00 2001 From: Bent Bisballe Nyeng Date: Fri, 4 May 2012 11:52:36 +0200 Subject: Make observe/unobserve work. Store message_t instead of std::string in message queue for client write callback. --- src/connectionhandler.cc | 2 +- src/munia_proto.cc | 116 +++++++++++++++++++++++++++++++++++++---------- 2 files changed, 92 insertions(+), 26 deletions(-) (limited to 'src') diff --git a/src/connectionhandler.cc b/src/connectionhandler.cc index c8881a3..da94cb8 100644 --- a/src/connectionhandler.cc +++ b/src/connectionhandler.cc @@ -67,7 +67,7 @@ void ConnectionHandler::unobserve(clientid_t clientid, taskid_t taskid) ObserverList ConnectionHandler::observerlist(TaskIdList tasks) { - printf("Observerlist request\n"); + printf("Observerlist request (#tasks: %d)\n", tasks.size()); ObserverList clients; for(TaskIdList::iterator i = tasks.begin(); i != tasks.end(); i++) { diff --git a/src/munia_proto.cc b/src/munia_proto.cc index a91c204..b7123aa 100644 --- a/src/munia_proto.cc +++ b/src/munia_proto.cc @@ -81,7 +81,7 @@ static void dump_handshake_info(struct lws_tokens *lwst) } #endif -static std::map > msgqueue; +static std::map > msgqueue; int callback_lws_task(struct libwebsocket_context * context, struct libwebsocket *wsi, @@ -102,11 +102,26 @@ int callback_lws_task(struct libwebsocket_context * context, break; case LWS_CALLBACK_SERVER_WRITEABLE: + /* { printf("Socket for client %p writable\n", wsi); if(msgqueue[wsi].size() > 0) { - std::string msg = msgqueue[wsi].front(); + message_t msg = msgqueue[wsi].front(); msgqueue[wsi].pop(); + + std::string msgcmd = msg_tostring(msg); + std::string msg; + clientid_t clientid = (*ci).first; + char tidstr[32]; + sprintf(tidstr, "%u", (*ci).second); + printf("\tAdding data to %p's queue\n", clientid); + + msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); + msg.append(tidstr); + msg.append(" "); + msg.append(msgcmd); + msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); + int n = libwebsocket_write(wsi, (unsigned char *) msg.c_str() + LWS_SEND_BUFFER_PRE_PADDING, @@ -127,6 +142,34 @@ int callback_lws_task(struct libwebsocket_context * context, libwebsocket_callback_on_writable(context, wsi); } } + */ + { + std::string msgstr; + msgstr.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); + + while(msgqueue[wsi].size() > 0) { + message_t msg = msgqueue[wsi].front(); + msgqueue[wsi].pop(); + char buf[32]; + sprintf(buf, "%d", msg.tid); + msgstr += std::string(buf) + " " + msg_tostring(msg); + } + + msgstr.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); + + int n = libwebsocket_write(wsi, (unsigned char *) + msgstr.c_str() + + LWS_SEND_BUFFER_PRE_PADDING, + msgstr.length() - + LWS_SEND_BUFFER_POST_PADDING - + LWS_SEND_BUFFER_PRE_PADDING, + LWS_WRITE_TEXT); + if(n < 0) { + fprintf(stderr, "ERROR writing to socket"); + exit(1); + } + } + break; /* @@ -152,30 +195,53 @@ int callback_lws_task(struct libwebsocket_context * context, MessageList::iterator omi = omsgs.begin(); while(omi != omsgs.end()) { - std::string msgcmd = msg_tostring(*omi); - - printf("%d nodes affected by command\n", omi->nodes.size()); - - std::list > clients = - connection_handler.observerlist(omi->nodes); - printf("Writing message to %d clients\n", clients.size()); - - std::list >::iterator ci = clients.begin(); - while(ci != clients.end()) { - std::string msg; - clientid_t clientid = (*ci).first; - char tidstr[32]; - sprintf(tidstr, "%u", (*ci).second); - printf("\tAdding data to %p's queue\n", clientid); - - msg.append((size_t)LWS_SEND_BUFFER_PRE_PADDING, ' '); - msg.append(tidstr); - msg.append(" "); - msg.append(msgcmd); - msg.append((size_t)LWS_SEND_BUFFER_POST_PADDING, ' '); + + if(omi->cmd == cmd::observe) { + + TaskIdList ids = task_manager.subTasks(omi->observe.id); + TaskIdList::iterator id = ids.begin(); + while(id != ids.end()) { + task_t task = task_manager.task(*id); + + message_t createmsg = create_msg_create(task); + message_t updatemsg = create_msg_update(task); + msgqueue[wsi].push(createmsg); + msgqueue[wsi].push(updatemsg); + + id++; + } + + } else if(omi->cmd == cmd::unobserve) { + + TaskIdList ids = task_manager.subTasks(omi->observe.id); + TaskIdList::iterator id = ids.begin(); + while(id != ids.end()) { + task_t task = task_manager.task(*id); - msgqueue[clientid].push(msg); - ci++; + message_t removemsg = create_msg_remove(task); + msgqueue[wsi].push(removemsg); + + id++; + } + + } else { + printf("%d nodes affected by command\n", omi->nodes.size()); + + ObserverList clients = connection_handler.observerlist(omi->nodes); + printf("Writing message to %d clients\n", clients.size()); + + ObserverList::iterator ci = clients.begin(); + while(ci != clients.end()) { + clientid_t clientid = (*ci).first; + taskid_t tid = (*ci).second; + + message_t msg = *omi; + msg.tid = tid; + + msgqueue[clientid].push(msg); + + ci++; + } } omi++; -- cgit v1.2.3