Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * fuse_server_client_thread.cpp - client thread for FuseServer 00004 * 00005 * Created: Tue Nov 13 20:00:55 2007 00006 * Copyright 2005-2007 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. A runtime exception applies to 00014 * this software (see LICENSE.GPL_WRE file mentioned below for details). 00015 * 00016 * This program is distributed in the hope that it will be useful, 00017 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00018 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00019 * GNU Library General Public License for more details. 00020 * 00021 * Read the full text in the LICENSE.GPL_WRE file in the doc directory. 00022 */ 00023 00024 #include <fvutils/net/fuse_server_client_thread.h> 00025 00026 #include <fvutils/net/fuse_server.h> 00027 #include <fvutils/net/fuse_server.h> 00028 #include <fvutils/net/fuse_transceiver.h> 00029 #include <fvutils/net/fuse_message_queue.h> 00030 #include <fvutils/net/fuse_image_content.h> 00031 #include <fvutils/net/fuse_lut_content.h> 00032 #include <fvutils/net/fuse_imagelist_content.h> 00033 #include <fvutils/net/fuse_lutlist_content.h> 00034 #include <fvutils/ipc/shm_image.h> 00035 #include <fvutils/ipc/shm_lut.h> 00036 #include <fvutils/compression/jpeg_compressor.h> 00037 00038 #include <core/exceptions/system.h> 00039 #include <netcomm/socket/stream.h> 00040 #include <netcomm/utils/exceptions.h> 00041 #include <utils/logging/liblogger.h> 00042 00043 #include <netinet/in.h> 00044 #include <cstring> 00045 #include <cstdlib> 00046 00047 using namespace fawkes; 00048 00049 namespace firevision { 00050 #if 0 /* just to make Emacs auto-indent happy */ 00051 } 00052 #endif 00053 00054 /** @class FuseServerClientThread <fvutils/net/fuse_server_client_thread.h> 00055 * FUSE Server Client Thread. 00056 * This thread is instantiated and started for each client that connects to a 00057 * FuseServer. 00058 * @ingroup FUSE 00059 * @ingroup FireVision 00060 * @author Tim Niemueller 00061 */ 00062 00063 /** Constructor. 00064 * @param fuse_server parent FUSE server 00065 * @param s socket to client 00066 */ 00067 FuseServerClientThread::FuseServerClientThread(FuseServer *fuse_server, StreamSocket *s) 00068 : Thread("FuseServerClientThread") 00069 { 00070 __fuse_server = fuse_server; 00071 __socket = s; 00072 __jpeg_compressor = NULL; 00073 00074 __inbound_queue = new FuseNetworkMessageQueue(); 00075 __outbound_queue = new FuseNetworkMessageQueue(); 00076 00077 FUSE_greeting_message_t *greetmsg = (FUSE_greeting_message_t *)malloc(sizeof(FUSE_greeting_message_t)); 00078 greetmsg->version = htonl(FUSE_CURRENT_VERSION); 00079 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_GREETING, 00080 greetmsg, sizeof(FUSE_greeting_message_t))); 00081 00082 __alive = true; 00083 } 00084 00085 00086 /** Destructor. */ 00087 FuseServerClientThread::~FuseServerClientThread() 00088 { 00089 delete __socket; 00090 delete __jpeg_compressor; 00091 00092 for (__bit = __buffers.begin(); __bit != __buffers.end(); ++__bit) { 00093 delete __bit->second; 00094 } 00095 __buffers.clear(); 00096 00097 for (__lit = __luts.begin(); __lit != __luts.end(); ++__lit ) { 00098 delete __lit->second; 00099 } 00100 __luts.clear(); 00101 00102 while ( ! __inbound_queue->empty() ) { 00103 FuseNetworkMessage *m = __inbound_queue->front(); 00104 m->unref(); 00105 __inbound_queue->pop(); 00106 } 00107 00108 while ( ! __outbound_queue->empty() ) { 00109 FuseNetworkMessage *m = __outbound_queue->front(); 00110 m->unref(); 00111 __outbound_queue->pop(); 00112 } 00113 00114 delete __inbound_queue; 00115 delete __outbound_queue; 00116 } 00117 00118 00119 /** Send all messages in outbound queue. */ 00120 void 00121 FuseServerClientThread::send() 00122 { 00123 if ( ! __outbound_queue->empty() ) { 00124 try { 00125 FuseNetworkTransceiver::send(__socket, __outbound_queue); 00126 } catch (Exception &e) { 00127 __fuse_server->connection_died(this); 00128 __alive = false; 00129 } 00130 } 00131 } 00132 00133 00134 /** Receive data. 00135 * Receives data from the network if there is any and then processes all 00136 * inbound messages. 00137 */ 00138 void 00139 FuseServerClientThread::recv() 00140 { 00141 try { 00142 FuseNetworkTransceiver::recv(__socket, __inbound_queue); 00143 } catch (ConnectionDiedException &e) { 00144 __socket->close(); 00145 __fuse_server->connection_died(this); 00146 __alive = false; 00147 } 00148 } 00149 00150 00151 /** Process greeting message. 00152 * @param m received message 00153 */ 00154 void 00155 FuseServerClientThread::process_greeting_message(FuseNetworkMessage *m) 00156 { 00157 FUSE_greeting_message_t *gm = m->msg<FUSE_greeting_message_t>(); 00158 if ( ntohl(gm->version) != FUSE_CURRENT_VERSION ) { 00159 throw Exception("Invalid version on other side"); 00160 } 00161 } 00162 00163 00164 SharedMemoryImageBuffer * 00165 FuseServerClientThread::get_shmimgbuf(const char *id) 00166 { 00167 char tmp_image_id[IMAGE_ID_MAX_LENGTH + 1]; 00168 tmp_image_id[IMAGE_ID_MAX_LENGTH] = 0; 00169 strncpy(tmp_image_id, id, IMAGE_ID_MAX_LENGTH); 00170 00171 if ( (__bit = __buffers.find( tmp_image_id )) == __buffers.end() ) { 00172 // the buffer has not yet been opened 00173 try { 00174 SharedMemoryImageBuffer *b = new SharedMemoryImageBuffer(tmp_image_id); 00175 __buffers[tmp_image_id] = b; 00176 return b; 00177 } catch (Exception &e) { 00178 throw; 00179 } 00180 } else { 00181 return __bit->second; 00182 } 00183 } 00184 00185 00186 /** Process image request message. 00187 * @param m received message 00188 */ 00189 void 00190 FuseServerClientThread::process_getimage_message(FuseNetworkMessage *m) 00191 { 00192 FUSE_imagereq_message_t *irm = m->msg<FUSE_imagereq_message_t>(); 00193 00194 SharedMemoryImageBuffer *b; 00195 try { 00196 b = get_shmimgbuf(irm->image_id); 00197 } catch (Exception &e) { 00198 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED, 00199 m->payload(), m->payload_size(), 00200 /* copy payload */ true); 00201 __outbound_queue->push(nm); 00202 return; 00203 } 00204 00205 if ( irm->format == FUSE_IF_RAW ) { 00206 FuseImageContent *im = new FuseImageContent(b); 00207 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im)); 00208 } else if ( irm->format == FUSE_IF_JPEG ) { 00209 if ( ! __jpeg_compressor) { 00210 __jpeg_compressor = new JpegImageCompressor(); 00211 __jpeg_compressor->set_compression_destination(ImageCompressor::COMP_DEST_MEM); 00212 } 00213 b->lock_for_read(); 00214 __jpeg_compressor->set_image_dimensions(b->width(), b->height()); 00215 __jpeg_compressor->set_image_buffer(b->colorspace(), b->buffer()); 00216 unsigned char *compressed_buffer = (unsigned char *)malloc(__jpeg_compressor->recommended_compressed_buffer_size()); 00217 __jpeg_compressor->set_destination_buffer(compressed_buffer, __jpeg_compressor->recommended_compressed_buffer_size()); 00218 __jpeg_compressor->compress(); 00219 b->unlock(); 00220 size_t compressed_buffer_size = __jpeg_compressor->compressed_size(); 00221 long int sec = 0, usec = 0; 00222 b->capture_time(&sec, &usec); 00223 FuseImageContent *im = new FuseImageContent(FUSE_IF_JPEG, b->image_id(), 00224 compressed_buffer, compressed_buffer_size, 00225 CS_UNKNOWN, b->width(), b->height(), 00226 sec, usec); 00227 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE, im)); 00228 free(compressed_buffer); 00229 } else { 00230 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED, 00231 m->payload(), m->payload_size(), 00232 /* copy payload */ true); 00233 __outbound_queue->push(nm); 00234 } 00235 } 00236 00237 /** Process image info request message. 00238 * @param m received message 00239 */ 00240 void 00241 FuseServerClientThread::process_getimageinfo_message(FuseNetworkMessage *m) 00242 { 00243 FUSE_imagedesc_message_t *idm = m->msg<FUSE_imagedesc_message_t>(); 00244 00245 SharedMemoryImageBuffer *b; 00246 try { 00247 b = get_shmimgbuf(idm->image_id); 00248 00249 FUSE_imageinfo_t *ii = (FUSE_imageinfo_t *)calloc(1, sizeof(FUSE_imageinfo_t)); 00250 00251 strncpy(ii->image_id, b->image_id(), IMAGE_ID_MAX_LENGTH); 00252 ii->colorspace = htons(b->colorspace()); 00253 ii->width = htonl(b->width()); 00254 ii->height = htonl(b->height()); 00255 ii->buffer_size = colorspace_buffer_size(b->colorspace(), b->width(), b->height()); 00256 00257 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_IMAGE_INFO, 00258 ii, sizeof(FUSE_imageinfo_t)); 00259 __outbound_queue->push(nm); 00260 } catch (Exception &e) { 00261 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_IMAGE_FAILED, 00262 m->payload(), m->payload_size(), 00263 /* copy payload */ true); 00264 __outbound_queue->push(nm); 00265 } 00266 } 00267 00268 00269 /** Process LUT request message. 00270 * @param m received message 00271 */ 00272 void 00273 FuseServerClientThread::process_getlut_message(FuseNetworkMessage *m) 00274 { 00275 FUSE_lutdesc_message_t *idm = m->msg<FUSE_lutdesc_message_t>(); 00276 00277 char tmp_lut_id[LUT_ID_MAX_LENGTH + 1]; 00278 tmp_lut_id[LUT_ID_MAX_LENGTH] = 0; 00279 strncpy(tmp_lut_id, idm->lut_id, LUT_ID_MAX_LENGTH); 00280 00281 if ( (__lit = __luts.find( tmp_lut_id )) != __luts.end() ) { 00282 // the buffer had already be opened 00283 FuseLutContent *lm = new FuseLutContent(__lit->second); 00284 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm)); 00285 } else { 00286 try { 00287 SharedMemoryLookupTable *b = new SharedMemoryLookupTable(tmp_lut_id); 00288 __luts[tmp_lut_id] = b; 00289 FuseLutContent *lm = new FuseLutContent(b); 00290 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT, lm)); 00291 } catch (Exception &e) { 00292 // could not open the shared memory segment for some reason, send failure 00293 FuseNetworkMessage *nm = new FuseNetworkMessage(FUSE_MT_GET_LUT_FAILED, 00294 m->payload(), m->payload_size(), 00295 /* copy payload */ true); 00296 __outbound_queue->push(nm); 00297 } 00298 } 00299 } 00300 00301 00302 /** Process LUT setting. 00303 * @param m received message 00304 */ 00305 void 00306 FuseServerClientThread::process_setlut_message(FuseNetworkMessage *m) 00307 { 00308 FuseLutContent *lc = m->msgc<FuseLutContent>(); 00309 FUSE_lutdesc_message_t *reply = (FUSE_lutdesc_message_t *)malloc(sizeof(FUSE_lutdesc_message_t)); 00310 strncpy(reply->lut_id, lc->lut_id(), LUT_ID_MAX_LENGTH); 00311 // Currently we expect colormaps, so make sure we get sensible dimensions 00312 00313 SharedMemoryLookupTable *b; 00314 if ( (__lit = __luts.find( lc->lut_id() )) != __luts.end() ) { 00315 // the buffer had already been opened 00316 b = __lit->second; 00317 } else { 00318 try { 00319 b = new SharedMemoryLookupTable(lc->lut_id(), /* read only */ false); 00320 __luts[lc->lut_id()] = b; 00321 } catch (Exception &e) { 00322 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED, 00323 reply, sizeof(FUSE_lutdesc_message_t))); 00324 e.append("Cannot open shared memory lookup table %s", lc->lut_id()); 00325 LibLogger::log_warn("FuseServerClientThread", e); 00326 delete lc; 00327 return; 00328 } 00329 } 00330 00331 if ( (b->width() != lc->width()) || 00332 (b->height() != lc->height()) || 00333 (b->depth() != lc->depth()) || 00334 (b->bytes_per_cell() != lc->bytes_per_cell()) ) { 00335 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_FAILED, 00336 reply, sizeof(FUSE_lutdesc_message_t))); 00337 LibLogger::log_warn("FuseServerClientThread", "LUT upload: dimensions do not match. " 00338 "Existing (%u,%u,%u,%u) != uploaded (%u,%u,%u,%u)", 00339 b->width(), b->height(), b->depth(), b->bytes_per_cell(), 00340 lc->width(), lc->height(), lc->depth(), lc->bytes_per_cell()); 00341 } else { 00342 b->set(lc->buffer()); 00343 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_SET_LUT_SUCCEEDED, 00344 reply, sizeof(FUSE_lutdesc_message_t))); 00345 } 00346 00347 delete lc; 00348 } 00349 00350 00351 /** Process image list request message. 00352 * @param m received message 00353 */ 00354 void 00355 FuseServerClientThread::process_getimagelist_message(FuseNetworkMessage *m) 00356 { 00357 FuseImageListContent *ilm = new FuseImageListContent(); 00358 00359 SharedMemoryImageBufferHeader *h = new SharedMemoryImageBufferHeader(); 00360 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_IMAGE_MAGIC_TOKEN, h); 00361 SharedMemory::SharedMemoryIterator endi = SharedMemory::end(); 00362 00363 while ( i != endi ) { 00364 const SharedMemoryImageBufferHeader *ih = dynamic_cast<const SharedMemoryImageBufferHeader *>(*i); 00365 if ( ih ) { 00366 ilm->add_imageinfo(ih->image_id(), ih->colorspace(), ih->width(), ih->height()); 00367 } 00368 00369 ++i; 00370 } 00371 00372 delete h; 00373 00374 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_IMAGE_LIST, ilm)); 00375 } 00376 00377 00378 /** Process LUT list request message. 00379 * @param m received message 00380 */ 00381 void 00382 FuseServerClientThread::process_getlutlist_message(FuseNetworkMessage *m) 00383 { 00384 FuseLutListContent *llm = new FuseLutListContent(); 00385 00386 SharedMemoryLookupTableHeader *h = new SharedMemoryLookupTableHeader(); 00387 SharedMemory::SharedMemoryIterator i = SharedMemory::find(FIREVISION_SHM_LUT_MAGIC_TOKEN, h); 00388 SharedMemory::SharedMemoryIterator endi = SharedMemory::end(); 00389 00390 while ( i != endi ) { 00391 const SharedMemoryLookupTableHeader *lh = dynamic_cast<const SharedMemoryLookupTableHeader *>(*i); 00392 if ( lh ) { 00393 llm->add_lutinfo(lh->lut_id(), lh->width(), lh->height(), lh->depth(), lh->bytes_per_cell()); 00394 } 00395 00396 ++i; 00397 } 00398 00399 delete h; 00400 00401 __outbound_queue->push(new FuseNetworkMessage(FUSE_MT_LUT_LIST, llm)); 00402 } 00403 00404 00405 /** Process inbound messages. */ 00406 void 00407 FuseServerClientThread::process_inbound() 00408 { 00409 __inbound_queue->lock(); 00410 while ( ! __inbound_queue->empty() ) { 00411 FuseNetworkMessage *m = __inbound_queue->front(); 00412 00413 try { 00414 switch (m->type()) { 00415 case FUSE_MT_GREETING: 00416 process_greeting_message(m); 00417 break; 00418 case FUSE_MT_GET_IMAGE: 00419 process_getimage_message(m); 00420 break; 00421 case FUSE_MT_GET_IMAGE_INFO: 00422 process_getimageinfo_message(m); 00423 break; 00424 case FUSE_MT_GET_IMAGE_LIST: 00425 process_getimagelist_message(m); 00426 break; 00427 case FUSE_MT_GET_LUT_LIST: 00428 process_getlutlist_message(m); 00429 break; 00430 case FUSE_MT_GET_LUT: 00431 process_getlut_message(m); 00432 break; 00433 case FUSE_MT_SET_LUT: 00434 process_setlut_message(m); 00435 break; 00436 default: 00437 throw Exception("Unknown message type received\n"); 00438 } 00439 } catch (Exception &e) { 00440 e.append("FUSE protocol error"); 00441 LibLogger::log_warn("FuseServerClientThread", e); 00442 __fuse_server->connection_died(this); 00443 __alive = false; 00444 } 00445 00446 m->unref(); 00447 __inbound_queue->pop(); 00448 } 00449 __inbound_queue->unlock(); 00450 } 00451 00452 00453 void 00454 FuseServerClientThread::loop() 00455 { 00456 if ( ! __alive ) { 00457 usleep(10000); 00458 return; 00459 } 00460 00461 short p = 0; 00462 try { 00463 p = __socket->poll(10); // block for up to 10 ms 00464 } catch (InterruptedException &e) { 00465 // we just ignore this and try it again 00466 return; 00467 } 00468 00469 if ( (p & Socket::POLL_ERR) || 00470 (p & Socket::POLL_HUP) || 00471 (p & Socket::POLL_RDHUP)) { 00472 __fuse_server->connection_died(this); 00473 __alive = false; 00474 } else if ( p & Socket::POLL_IN ) { 00475 try { 00476 // Data can be read 00477 recv(); 00478 process_inbound(); 00479 } 00480 catch (...) { 00481 __fuse_server->connection_died(this); 00482 __alive = false; 00483 } 00484 } 00485 00486 if ( __alive ) { 00487 send(); 00488 } 00489 } 00490 00491 } // end namespace firevision