• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdepimlibs-4.8.3 API Reference
  • KDE Home
  • Contact Us
 

akonadi

session.cpp
00001 /*
00002     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00003 
00004     This library is free software; you can redistribute it and/or modify it
00005     under the terms of the GNU Library General Public License as published by
00006     the Free Software Foundation; either version 2 of the License, or (at your
00007     option) any later version.
00008 
00009     This library is distributed in the hope that it will be useful, but WITHOUT
00010     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00011     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00012     License for more details.
00013 
00014     You should have received a copy of the GNU Library General Public License
00015     along with this library; see the file COPYING.LIB.  If not, write to the
00016     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00017     02110-1301, USA.
00018 */
00019 
00020 #include "session.h"
00021 #include "session_p.h"
00022 
00023 #include "imapparser_p.h"
00024 #include "job.h"
00025 #include "job_p.h"
00026 #include "servermanager.h"
00027 #include "servermanager_p.h"
00028 #include "xdgbasedirs_p.h"
00029 
00030 #include <kdebug.h>
00031 #include <klocale.h>
00032 
00033 #include <QCoreApplication>
00034 #include <QtCore/QDir>
00035 #include <QtCore/QQueue>
00036 #include <QtCore/QThreadStorage>
00037 #include <QtCore/QTimer>
00038 #include <QtCore/QThread>
00039 #include <QSettings>
00040 
00041 #include <QtNetwork/QLocalSocket>
00042 #include <QtNetwork/QTcpSocket>
00043 #include <QtNetwork/QHostAddress>
00044 
00045 // ### FIXME pipelining got broken by switching result emission in JobPrivate::handleResponse to delayed emission
00046 // in order to work around exec() deadlocks. As a result of that Session knows to late about a finished job and still
00047 // sends responses for the next one to the already finished one
00048 #define PIPELINE_LENGTH 0
00049 //#define PIPELINE_LENGTH 2
00050 
00051 using namespace Akonadi;
00052 
00053 
00054 //@cond PRIVATE
00055 
00056 void SessionPrivate::startNext()
00057 {
00058   QTimer::singleShot( 0, mParent, SLOT(doStartNext()) );
00059 }
00060 
00061 void SessionPrivate::reconnect()
00062 {
00063   QLocalSocket *localSocket = qobject_cast<QLocalSocket*>( socket );
00064   if ( localSocket && (localSocket->state() == QLocalSocket::ConnectedState
00065                        || localSocket->state() == QLocalSocket::ConnectingState ) ) {
00066     // nothing to do, we are still/already connected
00067     return;
00068   }
00069 
00070   QTcpSocket *tcpSocket = qobject_cast<QTcpSocket*>( socket );
00071   if ( tcpSocket && (tcpSocket->state() == QTcpSocket::ConnectedState
00072                      || tcpSocket->state() == QTcpSocket::ConnectingState ) ) {
00073     // same here, but for TCP
00074     return;
00075   }
00076 
00077   // try to figure out where to connect to
00078   QString serverAddress;
00079   quint16 port = 0;
00080   bool useTcp = false;
00081 
00082   // env var has precedence
00083   const QByteArray serverAddressEnvVar = qgetenv( "AKONADI_SERVER_ADDRESS" );
00084   if ( !serverAddressEnvVar.isEmpty() ) {
00085     const int pos = serverAddressEnvVar.indexOf( ':' );
00086     const QByteArray protocol = serverAddressEnvVar.left( pos  );
00087     QMap<QString, QString> options;
00088     foreach ( const QString &entry, QString::fromLatin1( serverAddressEnvVar.mid( pos + 1 ) ).split( QLatin1Char(',') ) ) {
00089       const QStringList pair = entry.split( QLatin1Char('=') );
00090       if ( pair.size() != 2 )
00091         continue;
00092       options.insert( pair.first(), pair.last() );
00093     }
00094     kDebug() << protocol << options;
00095 
00096     if ( protocol == "tcp" ) {
00097       serverAddress = options.value( QLatin1String( "host" ) );
00098       port = options.value( QLatin1String( "port" ) ).toUInt();
00099       useTcp = true;
00100     } else if ( protocol == "unix" ) {
00101       serverAddress = options.value( QLatin1String( "path" ) );
00102     } else if ( protocol == "pipe" ) {
00103       serverAddress = options.value( QLatin1String( "name" ) );
00104     }
00105   }
00106 
00107   // try config file next, fall back to defaults if that fails as well
00108   if ( serverAddress.isEmpty() ) {
00109     const QString connectionConfigFile = XdgBaseDirs::akonadiConnectionConfigFile();
00110     const QFileInfo fileInfo( connectionConfigFile );
00111     if ( !fileInfo.exists() ) {
00112       kDebug() << "Akonadi Client Session: connection config file '"
00113                   "akonadi/akonadiconnectionrc' can not be found in"
00114                << XdgBaseDirs::homePath( "config" ) << "nor in any of"
00115                << XdgBaseDirs::systemPathList( "config" );
00116     }
00117     const QSettings connectionSettings( connectionConfigFile, QSettings::IniFormat );
00118 
00119 #ifdef Q_OS_WIN  //krazy:exclude=cpp
00120     serverAddress = connectionSettings.value( QLatin1String( "Data/NamedPipe" ), QLatin1String( "Akonadi" ) ).toString();
00121 #else
00122     const QString defaultSocketDir = XdgBaseDirs::saveDir( "data", QLatin1String( "akonadi" ) );
00123     serverAddress = connectionSettings.value( QLatin1String( "Data/UnixPath" ), QString(defaultSocketDir + QLatin1String( "/akonadiserver.socket" )) ).toString();
00124 #endif
00125   }
00126 #ifdef Q_OS_WINCE
00127   useTcp = true;
00128 #endif
00129 
00130   // create sockets if not yet done, note that this does not yet allow changing socket types on the fly
00131   // but that's probably not something we need to support anyway
00132   if ( !socket ) {
00133     if ( !useTcp ) {
00134       socket = localSocket = new QLocalSocket( mParent );
00135       mParent->connect( localSocket, SIGNAL(error(QLocalSocket::LocalSocketError)), SLOT(socketError(QLocalSocket::LocalSocketError)) );
00136     } else {
00137       socket = tcpSocket = new QTcpSocket( mParent );
00138       mParent->connect( tcpSocket, SIGNAL(error(QAbstractSocket::SocketError)), SLOT(socketError(QAbstractSocket::SocketError)) );
00139     }
00140     mParent->connect( socket, SIGNAL(disconnected()), SLOT(socketDisconnected()) );
00141     mParent->connect( socket, SIGNAL(readyRead()), SLOT(dataReceived()) );
00142   }
00143 
00144   // actually do connect
00145   kDebug() << "connectToServer" << serverAddress;
00146 #ifdef Q_OS_WINCE
00147     tcpSocket->connectToHost( QHostAddress::LocalHost, 31414 );
00148 #else
00149   if ( !useTcp ) {
00150     localSocket->connectToServer( serverAddress );
00151   } else {
00152     tcpSocket->connectToHost( serverAddress, port );
00153   }
00154 #endif
00155 
00156   emit mParent->reconnected();
00157 }
00158 
00159 void SessionPrivate::socketError( QLocalSocket::LocalSocketError )
00160 {
00161   Q_ASSERT( mParent->sender() == socket );
00162   kWarning() << "Socket error occurred:" << qobject_cast<QLocalSocket*>( socket )->errorString();
00163   socketDisconnected();
00164 }
00165 
00166 void SessionPrivate::socketError( QAbstractSocket::SocketError )
00167 {
00168   Q_ASSERT( mParent->sender() == socket );
00169   kWarning() << "Socket error occurred:" << qobject_cast<QTcpSocket*>( socket )->errorString();
00170   socketDisconnected();
00171 }
00172 
00173 void SessionPrivate::socketDisconnected()
00174 {
00175   if ( currentJob )
00176     currentJob->d_ptr->lostConnection();
00177   connected = false;
00178 }
00179 
00180 void SessionPrivate::dataReceived()
00181 {
00182   while ( socket->bytesAvailable() > 0 ) {
00183     if ( parser->continuationSize() > 1 ) {
00184       const QByteArray data = socket->read( qMin( socket->bytesAvailable(), parser->continuationSize() - 1 ) );
00185       parser->parseBlock( data );
00186     } else if ( socket->canReadLine() ) {
00187       if ( !parser->parseNextLine( socket->readLine() ) )
00188         continue; // response not yet completed
00189 
00190       // handle login response
00191       if ( parser->tag() == QByteArray( "0" ) ) {
00192         if ( parser->data().startsWith( "OK" ) ) { //krazy:exclude=strings
00193           connected = true;
00194           startNext();
00195         } else {
00196           kWarning() << "Unable to login to Akonadi server:" << parser->data();
00197           socket->close();
00198           QTimer::singleShot( 1000, mParent, SLOT(reconnect()) );
00199         }
00200       }
00201 
00202       // send login command
00203       if ( parser->tag() == "*" && parser->data().startsWith( "OK Akonadi" ) ) {
00204         const int pos = parser->data().indexOf( "[PROTOCOL" );
00205         if ( pos > 0 ) {
00206           qint64 tmp = 0;
00207           ImapParser::parseNumber( parser->data(), tmp, 0, pos + 9 );
00208           protocolVersion = tmp;
00209           Internal::setServerProtocolVersion( tmp );
00210         }
00211         kDebug() << "Server protocol version is:" << protocolVersion;
00212 
00213         writeData( "0 LOGIN " + ImapParser::quote( sessionId ) + '\n' );
00214 
00215       // work for the current job
00216       } else {
00217         if ( currentJob )
00218           currentJob->d_ptr->handleResponse( parser->tag(), parser->data() );
00219       }
00220 
00221       // reset parser stuff
00222       parser->reset();
00223     } else {
00224       break; // nothing we can do for now
00225     }
00226   }
00227 }
00228 
00229 bool SessionPrivate::canPipelineNext()
00230 {
00231   if ( queue.isEmpty() || pipeline.count() >= PIPELINE_LENGTH )
00232     return false;
00233   if ( pipeline.isEmpty() && currentJob )
00234     return currentJob->d_ptr->mWriteFinished;
00235   if ( !pipeline.isEmpty() )
00236     return pipeline.last()->d_ptr->mWriteFinished;
00237   return false;
00238 }
00239 
00240 void SessionPrivate::doStartNext()
00241 {
00242   if ( !connected || (queue.isEmpty() && pipeline.isEmpty()) )
00243     return;
00244   if ( canPipelineNext() ) {
00245     Akonadi::Job *nextJob = queue.dequeue();
00246     pipeline.enqueue( nextJob );
00247     startJob( nextJob );
00248   }
00249   if ( jobRunning )
00250     return;
00251   jobRunning = true;
00252   if ( !pipeline.isEmpty() ) {
00253     currentJob = pipeline.dequeue();
00254   } else {
00255     currentJob = queue.dequeue();
00256     startJob( currentJob );
00257   }
00258 }
00259 
00260 void SessionPrivate::startJob( Job *job )
00261 {
00262   if ( protocolVersion < minimumProtocolVersion() ) {
00263     job->setError( Job::ProtocolVersionMismatch );
00264     job->setErrorText( i18n( "Protocol version %1 found, expected at least %2", protocolVersion, minimumProtocolVersion() ) );
00265     job->emitResult();
00266   } else {
00267     job->d_ptr->startQueued();
00268   }
00269 }
00270 
00271 void SessionPrivate::endJob( Job *job )
00272 {
00273   job->emitResult();
00274 }
00275 
00276 void SessionPrivate::jobDone(KJob * job)
00277 {
00278   // ### careful, this method can be called from the QObject dtor of job (see jobDestroyed() below)
00279   // so don't call any methods on job itself
00280   if ( job == currentJob ) {
00281     if ( pipeline.isEmpty() ) {
00282       jobRunning = false;
00283       currentJob = 0;
00284     } else {
00285       currentJob = pipeline.dequeue();
00286     }
00287     startNext();
00288   } else {
00289     // non-current job finished, likely canceled while still in the queue
00290     queue.removeAll( static_cast<Akonadi::Job*>( job ) );
00291     // ### likely not enough to really cancel already running jobs
00292     pipeline.removeAll( static_cast<Akonadi::Job*>( job ) );
00293   }
00294 }
00295 
00296 void SessionPrivate::jobWriteFinished( Akonadi::Job* job )
00297 {
00298   Q_ASSERT( (job == currentJob && pipeline.isEmpty()) || (job = pipeline.last()) );
00299   Q_UNUSED( job );
00300 
00301   startNext();
00302 }
00303 
00304 void SessionPrivate::jobDestroyed(QObject * job)
00305 {
00306   // careful, accessing non-QObject methods of job will fail here already
00307   jobDone( static_cast<KJob*>( job ) );
00308 }
00309 
00310 void SessionPrivate::addJob(Job * job)
00311 {
00312   queue.append( job );
00313   QObject::connect( job, SIGNAL(result(KJob*)), mParent, SLOT(jobDone(KJob*)) );
00314   QObject::connect( job, SIGNAL(writeFinished(Akonadi::Job*)), mParent, SLOT(jobWriteFinished(Akonadi::Job*)) );
00315   QObject::connect( job, SIGNAL(destroyed(QObject*)), mParent, SLOT(jobDestroyed(QObject*)) );
00316   startNext();
00317 }
00318 
00319 int SessionPrivate::nextTag()
00320 {
00321   return theNextTag++;
00322 }
00323 
00324 void SessionPrivate::writeData(const QByteArray & data)
00325 {
00326   if ( socket )
00327     socket->write( data );
00328   else
00329     kWarning() << "Trying to write while session is disconnected!" << kBacktrace();
00330 }
00331 
00332 void SessionPrivate::serverStateChanged( ServerManager::State state )
00333 {
00334   if ( state == ServerManager::Running && !connected )
00335     reconnect();
00336 }
00337 
00338 void SessionPrivate::itemRevisionChanged( Akonadi::Item::Id itemId, int oldRevision, int newRevision )
00339 {
00340   // only deal with the queue, for the guys in the pipeline it's too late already anyway
00341   // and they shouldn't have gotten there if they depend on a preceding job anyway.
00342   foreach ( Job *job, queue )
00343     job->d_ptr->updateItemRevision( itemId, oldRevision, newRevision );
00344 }
00345 
00346 //@endcond
00347 
00348 
00349 SessionPrivate::SessionPrivate( Session *parent )
00350     : mParent( parent ), socket( 0 ), protocolVersion( 0 ), currentJob( 0 ), parser( 0 )
00351 {
00352 }
00353 
00354 void SessionPrivate::init( const QByteArray &id )
00355 {
00356   kDebug() << id;
00357   parser = new ImapParser();
00358 
00359   if ( !id.isEmpty() ) {
00360     sessionId = id;
00361   } else {
00362     sessionId = QCoreApplication::instance()->applicationName().toUtf8()
00363         + '-' + QByteArray::number( qrand() );
00364   }
00365 
00366   connected = false;
00367   theNextTag = 1;
00368   jobRunning = false;
00369 
00370   if ( ServerManager::state() == ServerManager::NotRunning )
00371     ServerManager::start();
00372   mParent->connect( ServerManager::self(), SIGNAL(stateChanged(Akonadi::ServerManager::State)),
00373                     SLOT(serverStateChanged(Akonadi::ServerManager::State)) );
00374 
00375   reconnect();
00376 }
00377 
00378 Session::Session(const QByteArray & sessionId, QObject * parent) :
00379     QObject( parent ),
00380     d( new SessionPrivate( this ) )
00381 {
00382   d->init( sessionId );
00383 }
00384 
00385 Session::Session( SessionPrivate *dd, const QByteArray & sessionId, QObject * parent)
00386     : QObject( parent ),
00387     d( dd )
00388 {
00389   d->init( sessionId );
00390 }
00391 
00392 Session::~Session()
00393 {
00394   clear();
00395   delete d;
00396 }
00397 
00398 QByteArray Session::sessionId() const
00399 {
00400   return d->sessionId;
00401 }
00402 
00403 static QThreadStorage<Session*> instances;
00404 
00405 void SessionPrivate::createDefaultSession( const QByteArray &sessionId )
00406 {
00407   Q_ASSERT_X( !sessionId.isEmpty(), "SessionPrivate::createDefaultSession",
00408               "You tried to create a default session with empty session id!" );
00409   Q_ASSERT_X( !instances.hasLocalData(), "SessionPrivate::createDefaultSession",
00410               "You tried to create a default session twice!" );
00411 
00412   instances.setLocalData( new Session( sessionId ) );
00413 }
00414 
00415 Session* Session::defaultSession()
00416 {
00417   if ( !instances.hasLocalData() )
00418     instances.setLocalData( new Session() );
00419   return instances.localData();
00420 }
00421 
00422 void Session::clear()
00423 {
00424   foreach ( Job* job, d->queue )
00425     job->kill( KJob::EmitResult );
00426   d->queue.clear();
00427   foreach ( Job* job, d->pipeline )
00428     job->kill( KJob::EmitResult );
00429   d->pipeline.clear();
00430   if ( d->currentJob )
00431     d->currentJob->kill( KJob::EmitResult );
00432   d->jobRunning = false;
00433   d->connected = false;
00434   if ( d->socket )
00435       d->socket->disconnect( this ); // prevent signal emitted from close() causing mayhem - we might be called from ~QThreadStorage!
00436   delete d->socket;
00437   d->socket = 0;
00438   QMetaObject::invokeMethod( this, "reconnect", Qt::QueuedConnection ); // avoids reconnecting in the dtor
00439 }
00440 
00441 #include "session.moc"
This file is part of the KDE documentation.
Documentation copyright © 1996-2012 The KDE developers.
Generated on Mon May 14 2012 04:53:00 by doxygen 1.7.5 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs-4.8.3 API Reference

Skip menu "kdepimlibs-4.8.3 API Reference"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal