Fawkes API
Fawkes Development Version
|
00001 00002 /*************************************************************************** 00003 * log_thread.cpp - BB Logger Thread 00004 * 00005 * Created: Sun Nov 08 00:02:09 2009 00006 * Copyright 2006-2009 Tim Niemueller [www.niemueller.de] 00007 * 00008 ****************************************************************************/ 00009 00010 /* This program is free software; you can redistribute it and/or modify 00011 * it under the terms of the GNU General Public License as published by 00012 * the Free Software Foundation; either version 2 of the License, or 00013 * (at your option) any later version. 00014 * 00015 * This program is distributed in the hope that it will be useful, 00016 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00017 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00018 * GNU Library General Public License for more details. 00019 * 00020 * Read the full text in the LICENSE.GPL file in the doc directory. 00021 */ 00022 00023 #include "log_thread.h" 00024 #include "file.h" 00025 00026 #include <blackboard/blackboard.h> 00027 #include <utils/logging/logger.h> 00028 #include <core/exceptions/system.h> 00029 #include <interfaces/SwitchInterface.h> 00030 00031 #include <memory> 00032 #include <cstring> 00033 #include <cstdlib> 00034 #include <cstdio> 00035 #include <cerrno> 00036 #include <fcntl.h> 00037 #ifdef __FreeBSD__ 00038 # include <sys/endian.h> 00039 #else 00040 # include <endian.h> 00041 #endif 00042 #include <arpa/inet.h> 00043 #include <sys/stat.h> 00044 #include <sys/mman.h> 00045 00046 using namespace fawkes; 00047 00048 /** @class BBLoggerThread "log_thread.h" 00049 * BlackBoard logger thread. 00050 * One instance of this thread handles logging of one specific interface. 00051 * The plugin will spawn as many threads as there are interfaces to log. This 00052 * allows for maximum concurrency of the writers and avoids a serialization 00053 * bottle neck. 00054 * The log thread can operate in buffering mode. If this mode is disabled, the 00055 * data is written to the file within the blackboard data changed event, and 00056 * thus the writing operation can slow down the overall system, but memory 00057 * requirements are low. This is useful if a lot of data is written or if the 00058 * storage device is slow. If the mode is enabled, during the event the BB data 00059 * will be copied into another memory segment and the thread will be woken up. 00060 * Once the thread is running it stores all of the BB data segments bufferd 00061 * up to then. 00062 * The interface listener listens for events for a particular interface and 00063 * then writes the changes to the file. 00064 * @author Tim Niemueller 00065 */ 00066 00067 /** Constructor. 00068 * @param iface_uid interface UID which to log 00069 * @param logdir directory to store config files, must exist 00070 * @param buffering enable log buffering? 00071 * @param flushing true to flush after each written chunk 00072 * @param scenario ID of the log scenario 00073 * @param start_time time to use as start time for the log 00074 */ 00075 BBLoggerThread::BBLoggerThread(const char *iface_uid, 00076 const char *logdir, bool buffering, bool flushing, 00077 const char *scenario, fawkes::Time *start_time) 00078 : Thread("BBLoggerThread", Thread::OPMODE_WAITFORWAKEUP), 00079 BlackBoardInterfaceListener("BBLoggerThread(%s)", iface_uid) 00080 { 00081 set_coalesce_wakeups(true); 00082 set_name("BBLoggerThread(%s)", iface_uid); 00083 00084 __buffering = buffering; 00085 __flushing = flushing; 00086 __uid = strdup(iface_uid); 00087 __logdir = strdup(logdir); 00088 __scenario = strdup(scenario); 00089 __start = new Time(start_time); 00090 __filename = NULL; 00091 __queue_mutex = new Mutex(); 00092 __data_size = 0; 00093 __is_master = false; 00094 __enabled = true; 00095 00096 __now = NULL; 00097 00098 // Parse UID 00099 Interface::parse_uid(__uid, &__type, &__id); 00100 00101 char date[21]; 00102 Time now; 00103 struct tm *tmp = localtime(&(now.get_timeval()->tv_sec)); 00104 strftime(date, 21, "%F-%H-%M-%S", tmp); 00105 00106 if (asprintf(&__filename, "%s/%s-%s-%s-%s.log", LOGDIR, __scenario, 00107 __type, __id, date) == -1) { 00108 throw OutOfMemoryException("Cannot generate log name"); 00109 } 00110 } 00111 00112 00113 /** Destructor. */ 00114 BBLoggerThread::~BBLoggerThread() 00115 { 00116 free(__uid); 00117 free(__type); 00118 free(__id); 00119 free(__logdir); 00120 free(__scenario); 00121 free(__filename); 00122 delete __queue_mutex; 00123 delete __start; 00124 } 00125 00126 00127 void 00128 BBLoggerThread::init() 00129 { 00130 __queues[0].clear(); 00131 __queues[1].clear(); 00132 __act_queue = 0; 00133 00134 __queue_mutex = new Mutex(); 00135 __data_size = 0; 00136 00137 __now = NULL; 00138 __num_data_items = 0; 00139 __session_start = 0; 00140 00141 // use open because fopen does not provide O_CREAT | O_EXCL 00142 // open read/write because of usage of mmap 00143 mode_t m = S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH; 00144 int fd = open(__filename, O_RDWR | O_CREAT | O_EXCL, m); 00145 if ( ! fd ) { 00146 throw CouldNotOpenFileException(__filename, errno, "Failed to open log 1"); 00147 } else { 00148 __f_data = fdopen(fd, "w+"); 00149 if ( ! __f_data ) { 00150 throw CouldNotOpenFileException(__filename, errno, "Failed to open log 2"); 00151 } 00152 } 00153 00154 try { 00155 __iface = blackboard->open_for_reading(__type, __id); 00156 __data_size = __iface->datasize(); 00157 } catch (Exception &e) { 00158 fclose(__f_data); 00159 throw; 00160 } 00161 00162 try { 00163 write_header(); 00164 } catch (FileWriteException &e) { 00165 blackboard->close(__iface); 00166 fclose(__f_data); 00167 throw; 00168 } 00169 00170 __now = new Time(clock); 00171 00172 if (__is_master) { 00173 try { 00174 __switch_if = blackboard->open_for_writing<SwitchInterface>("BBLogger"); 00175 __switch_if->set_enabled(__enabled); 00176 __switch_if->write(); 00177 bbil_add_message_interface(__switch_if); 00178 } catch (Exception &e) { 00179 fclose(__f_data); 00180 throw; 00181 } 00182 } 00183 00184 bbil_add_data_interface(__iface); 00185 bbil_add_writer_interface(__iface); 00186 00187 blackboard->register_listener(this, BlackBoard::BBIL_FLAG_DATA | 00188 BlackBoard::BBIL_FLAG_WRITER | 00189 BlackBoard::BBIL_FLAG_MESSAGES); 00190 00191 logger->log_info(name(), "Logging %s to %s%s", __iface->uid(), __filename, 00192 __is_master ? " as master" : ""); 00193 } 00194 00195 00196 void 00197 BBLoggerThread::finalize() 00198 { 00199 blackboard->unregister_listener(this); 00200 if (__is_master) { 00201 blackboard->close(__switch_if); 00202 } 00203 update_header(); 00204 fclose(__f_data); 00205 for (unsigned int q = 0; q < 2; ++q) { 00206 while (!__queues[q].empty()) { 00207 void *t = __queues[q].front(); 00208 free(t); 00209 __queues[q].pop(); 00210 } 00211 } 00212 delete __now; 00213 __now = NULL; 00214 } 00215 00216 00217 /** Get filename. 00218 * @return file name, valid after object instantiated, but before init() does not 00219 * mean that the file has been or can actually be opened 00220 */ 00221 const char * 00222 BBLoggerThread::get_filename() const 00223 { 00224 return __filename; 00225 } 00226 00227 00228 /** Enable or disable logging. 00229 * @param enabled true to enable logging, false to disable 00230 */ 00231 void 00232 BBLoggerThread::set_enabled(bool enabled) 00233 { 00234 if (enabled && !__enabled) { 00235 logger->log_info(name(), "Logging enabled", 00236 (__num_data_items - __session_start)); 00237 __session_start = __num_data_items; 00238 } else if (!enabled && __enabled) { 00239 logger->log_info(name(), "Logging disabled (wrote %u entries), flushing", 00240 (__num_data_items - __session_start)); 00241 update_header(); 00242 fflush(__f_data); 00243 } 00244 00245 __enabled = enabled; 00246 } 00247 00248 00249 /** Set threadlist and master status. 00250 * This copies the thread list and sets this thread as master thread. 00251 * If you intend to use this method you must do so before the thread is 00252 * initialized. You may only ever declare one thread as master. 00253 * @param thread_list list of threads to notify on enable/disable events 00254 */ 00255 void 00256 BBLoggerThread::set_threadlist(fawkes::ThreadList &thread_list) 00257 { 00258 __is_master = true; 00259 __threads = thread_list; 00260 } 00261 00262 void 00263 BBLoggerThread::write_header() 00264 { 00265 bblog_file_header header; 00266 memset(&header, 0, sizeof(header)); 00267 header.file_magic = htonl(BBLOGGER_FILE_MAGIC); 00268 header.file_version = htonl(BBLOGGER_FILE_VERSION); 00269 #if __BYTE_ORDER == __BIG_ENDIAN 00270 header.endianess = BBLOG_BIG_ENDIAN; 00271 #else 00272 header.endianess = BBLOG_LITTLE_ENDIAN; 00273 #endif 00274 header.num_data_items = __num_data_items; 00275 strncpy(header.scenario, (const char *)__scenario, BBLOG_SCENARIO_SIZE); 00276 strncpy(header.interface_type, __iface->type(), BBLOG_INTERFACE_TYPE_SIZE); 00277 strncpy(header.interface_id, __iface->id(), BBLOG_INTERFACE_ID_SIZE); 00278 memcpy(header.interface_hash, __iface->hash(), BBLOG_INTERFACE_HASH_SIZE); 00279 header.data_size = __iface->datasize(); 00280 long start_time_sec, start_time_usec; 00281 __start->get_timestamp(start_time_sec, start_time_usec); 00282 header.start_time_sec = start_time_sec; 00283 header.start_time_usec = start_time_usec; 00284 if (fwrite(&header, sizeof(header), 1, __f_data) != 1) { 00285 throw FileWriteException(__filename, "Failed to write header"); 00286 } 00287 fflush(__f_data); 00288 } 00289 00290 /** Updates the num_data_items field in the header. */ 00291 void 00292 BBLoggerThread::update_header() 00293 { 00294 // write updated num_data_items field 00295 #if _POSIX_MAPPED_FILES 00296 void *h = mmap(NULL, sizeof(bblog_file_header), PROT_WRITE, MAP_SHARED, 00297 fileno(__f_data), 0); 00298 if (h == MAP_FAILED) { 00299 logger->log_warn(name(), "Failed to mmap log (%s), " 00300 "not updating number of data items", 00301 strerror(errno)); 00302 } else { 00303 bblog_file_header *header = (bblog_file_header *)h; 00304 header->num_data_items = __num_data_items; 00305 munmap(h, sizeof(bblog_file_header)); 00306 } 00307 #else 00308 logger->log_warn(name(), "Memory mapped files not available, " 00309 "not updating number of data items on close"); 00310 #endif 00311 } 00312 00313 void 00314 BBLoggerThread::write_chunk(const void *chunk) 00315 { 00316 bblog_entry_header ehead; 00317 __now->stamp(); 00318 Time d = *__now - *__start; 00319 long rel_time_sec, rel_time_usec; 00320 d.get_timestamp(rel_time_sec, rel_time_usec); 00321 ehead.rel_time_sec = rel_time_sec; 00322 ehead.rel_time_usec = rel_time_usec; 00323 if ( (fwrite(&ehead, sizeof(ehead), 1, __f_data) == 1) && 00324 (fwrite(chunk, __data_size, 1, __f_data) == 1) ) { 00325 if (__flushing) fflush(__f_data); 00326 __num_data_items += 1; 00327 } else { 00328 logger->log_warn(name(), "Failed to write chunk"); 00329 } 00330 } 00331 00332 00333 void 00334 BBLoggerThread::loop() 00335 { 00336 unsigned int write_queue = __act_queue; 00337 __queue_mutex->lock(); 00338 __act_queue = 1 - __act_queue; 00339 __queue_mutex->unlock(); 00340 LockQueue<void *> &queue = __queues[write_queue]; 00341 //logger->log_debug(name(), "Writing %zu entries", queue.size()); 00342 while (! queue.empty() ) { 00343 void *c = queue.front(); 00344 write_chunk(c); 00345 free(c); 00346 queue.pop(); 00347 } 00348 } 00349 00350 bool 00351 BBLoggerThread::bb_interface_message_received(Interface *interface, 00352 Message *message) throw() 00353 { 00354 SwitchInterface::EnableSwitchMessage *enm; 00355 SwitchInterface::DisableSwitchMessage *dism; 00356 00357 bool enabled = true; 00358 if ((enm = dynamic_cast<SwitchInterface::EnableSwitchMessage *>(message)) != NULL) { 00359 enabled = true; 00360 } else if ((dism = dynamic_cast<SwitchInterface::DisableSwitchMessage *>(message)) != NULL) { 00361 enabled = false; 00362 } else { 00363 logger->log_debug(name(), "Unhandled message type: %s via %s", 00364 message->type(), interface->uid()); 00365 } 00366 00367 for (ThreadList::iterator i = __threads.begin(); i != __threads.end(); ++i) { 00368 BBLoggerThread *bblt = dynamic_cast<BBLoggerThread *>(*i); 00369 bblt->set_enabled(enabled); 00370 } 00371 00372 __switch_if->set_enabled(__enabled); 00373 __switch_if->write(); 00374 00375 return false; 00376 } 00377 00378 00379 void 00380 BBLoggerThread::bb_interface_data_changed(Interface *interface) throw() 00381 { 00382 if (!__enabled) return; 00383 00384 try { 00385 __iface->read(); 00386 00387 if ( __buffering ) { 00388 void *c = malloc(__iface->datasize()); 00389 memcpy(c, __iface->datachunk(), __iface->datasize()); 00390 __queue_mutex->lock(); 00391 __queues[__act_queue].push_locked(c); 00392 __queue_mutex->unlock(); 00393 wakeup(); 00394 } else { 00395 __queue_mutex->lock(); 00396 write_chunk(__iface->datachunk()); 00397 __queue_mutex->unlock(); 00398 } 00399 00400 } catch (Exception &e) { 00401 logger->log_error(name(), "Exception when data changed"); 00402 logger->log_error(name(), e); 00403 } 00404 } 00405 00406 void 00407 BBLoggerThread::bb_interface_writer_added(Interface *interface, 00408 unsigned int instance_serial) throw() 00409 { 00410 __session_start = __num_data_items; 00411 } 00412 00413 void 00414 BBLoggerThread::bb_interface_writer_removed(Interface *interface, 00415 unsigned int instance_serial) throw() 00416 { 00417 logger->log_info(name(), "Writer removed (wrote %u entries), flushing", 00418 (__num_data_items - __session_start)); 00419 update_header(); 00420 fflush(__f_data); 00421 }