• Skip to content
  • Skip to link menu
  • KDE API Reference
  • kdepimlibs-4.8.3 API Reference
  • KDE Home
  • Contact Us
 

akonadi

itemsync.cpp
00001 /*
00002     Copyright (c) 2007 Tobias Koenig <tokoe@kde.org>
00003     Copyright (c) 2007 Volker Krause <vkrause@kde.org>
00004 
00005     This library is free software; you can redistribute it and/or modify it
00006     under the terms of the GNU Library General Public License as published by
00007     the Free Software Foundation; either version 2 of the License, or (at your
00008     option) any later version.
00009 
00010     This library is distributed in the hope that it will be useful, but WITHOUT
00011     ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
00012     FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Library General Public
00013     License for more details.
00014 
00015     You should have received a copy of the GNU Library General Public License
00016     along with this library; see the file COPYING.LIB.  If not, write to the
00017     Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
00018     02110-1301, USA.
00019 */
00020 
00021 #include "itemsync.h"
00022 
00023 #include "collection.h"
00024 #include "item.h"
00025 #include "item_p.h"
00026 #include "itemcreatejob.h"
00027 #include "itemdeletejob.h"
00028 #include "itemfetchjob.h"
00029 #include "itemmodifyjob.h"
00030 #include "transactionsequence.h"
00031 #include "itemfetchscope.h"
00032 
00033 #include <kdebug.h>
00034 
00035 #include <QtCore/QStringList>
00036 
00037 using namespace Akonadi;
00038 
00042 class ItemSync::Private
00043 {
00044   public:
00045     Private( ItemSync *parent ) :
00046       q( parent ),
00047       mTransactionMode( SingleTransaction ),
00048       mCurrentTransaction( 0 ),
00049       mTransactionJobs( 0 ),
00050       mPendingJobs( 0 ),
00051       mProgress( 0 ),
00052       mTotalItems( -1 ),
00053       mTotalItemsProcessed( 0 ),
00054       mStreaming( false ),
00055       mIncremental( false ),
00056       mLocalListDone( false ),
00057       mDeliveryDone( false ),
00058       mFinished( false )
00059     {
00060       // we want to fetch all data by default
00061       mFetchScope.fetchFullPayload();
00062       mFetchScope.fetchAllAttributes();
00063     }
00064 
00065     void createLocalItem( const Item &item );
00066     void checkDone();
00067     void slotLocalListDone( KJob* );
00068     void slotLocalDeleteDone( KJob* );
00069     void slotLocalChangeDone( KJob* );
00070     void execute();
00071     void processItems();
00072     void deleteItems( const Item::List &items );
00073     void slotTransactionResult( KJob *job );
00074     Job* subjobParent() const;
00075 
00076     ItemSync *q;
00077     Collection mSyncCollection;
00078     QHash<Item::Id, Akonadi::Item> mLocalItemsById;
00079     QHash<QString, Akonadi::Item> mLocalItemsByRemoteId;
00080     QSet<Akonadi::Item> mUnprocessedLocalItems;
00081 
00082     ItemSync::TransactionMode mTransactionMode;
00083     TransactionSequence *mCurrentTransaction;
00084     int mTransactionJobs;
00085 
00086     // fetch scope for initial item listing
00087     ItemFetchScope mFetchScope;
00088 
00089     // remote items
00090     Akonadi::Item::List mRemoteItems;
00091 
00092     // removed remote items
00093     Item::List mRemovedRemoteItems;
00094 
00095     // create counter
00096     int mPendingJobs;
00097     int mProgress;
00098     int mTotalItems;
00099     int mTotalItemsProcessed;
00100 
00101     bool mStreaming;
00102     bool mIncremental;
00103     bool mLocalListDone;
00104     bool mDeliveryDone;
00105     bool mFinished;
00106 };
00107 
00108 void ItemSync::Private::createLocalItem( const Item & item )
00109 {
00110   // don't try to do anything in error state
00111   if ( q->error() )
00112     return;
00113   mPendingJobs++;
00114   ItemCreateJob *create = new ItemCreateJob( item, mSyncCollection, subjobParent() );
00115   q->connect( create, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
00116 }
00117 
00118 void ItemSync::Private::checkDone()
00119 {
00120   q->setProcessedAmount( KJob::Bytes, mProgress );
00121   if ( mPendingJobs > 0 || !mDeliveryDone || mTransactionJobs > 0 )
00122     return;
00123 
00124   if ( !mFinished ) { // prevent double result emission, can happen since checkDone() is called from all over the place
00125     mFinished = true;
00126     q->emitResult();
00127   }
00128 }
00129 
00130 ItemSync::ItemSync( const Collection &collection, QObject *parent ) :
00131     Job( parent ),
00132     d( new Private( this ) )
00133 {
00134   d->mSyncCollection = collection;
00135 }
00136 
00137 ItemSync::~ItemSync()
00138 {
00139   delete d;
00140 }
00141 
00142 void ItemSync::setFullSyncItems( const Item::List &items )
00143 {
00144   Q_ASSERT( !d->mIncremental );
00145   if ( !d->mStreaming )
00146     d->mDeliveryDone = true;
00147   d->mRemoteItems += items;
00148   d->mTotalItemsProcessed += items.count();
00149   kDebug() << "Received: " << items.count() << "In total: " << d->mTotalItemsProcessed << " Wanted: " << d->mTotalItems;
00150   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00151   if ( d->mTotalItemsProcessed == d->mTotalItems )
00152     d->mDeliveryDone = true;
00153   d->execute();
00154 }
00155 
00156 void ItemSync::setTotalItems( int amount )
00157 {
00158   Q_ASSERT( !d->mIncremental );
00159   Q_ASSERT( amount >= 0 );
00160   setStreamingEnabled( true );
00161   kDebug() << amount;
00162   d->mTotalItems = amount;
00163   setTotalAmount( KJob::Bytes, amount );
00164   if ( d->mTotalItems == 0 ) {
00165     d->mDeliveryDone = true;
00166     d->execute();
00167   }
00168 }
00169 
00170 void ItemSync::setIncrementalSyncItems( const Item::List &changedItems, const Item::List &removedItems )
00171 {
00172   d->mIncremental = true;
00173   if ( !d->mStreaming )
00174     d->mDeliveryDone = true;
00175   d->mRemoteItems += changedItems;
00176   d->mRemovedRemoteItems += removedItems;
00177   d->mTotalItemsProcessed += changedItems.count() + removedItems.count();
00178   setTotalAmount( KJob::Bytes, d->mTotalItemsProcessed );
00179   if ( d->mTotalItemsProcessed == d->mTotalItems )
00180     d->mDeliveryDone = true;
00181   d->execute();
00182 }
00183 
00184 void ItemSync::setFetchScope( ItemFetchScope &fetchScope )
00185 {
00186   d->mFetchScope = fetchScope;
00187 }
00188 
00189 ItemFetchScope &ItemSync::fetchScope()
00190 {
00191   return d->mFetchScope;
00192 }
00193 
00194 void ItemSync::doStart()
00195 {
00196   ItemFetchJob* job = new ItemFetchJob( d->mSyncCollection, this );
00197   job->setFetchScope( d->mFetchScope );
00198 
00199   // we only can fetch parts already in the cache, otherwise this will deadlock
00200   job->fetchScope().setCacheOnly( true );
00201 
00202   connect( job, SIGNAL(result(KJob*)), SLOT(slotLocalListDone(KJob*)) );
00203 }
00204 
00205 bool ItemSync::updateItem( const Item &storedItem, Item &newItem )
00206 {
00207   // we are in error state, better not change anything at all anymore
00208   if ( error() )
00209     return false;
00210 
00211   /*
00212    * We know that this item has changed (as it is part of the
00213    * incremental changed list), so we just put it into the
00214    * storage.
00215    */
00216   if ( d->mIncremental )
00217     return true;
00218 
00219   if ( newItem.d_func()->mClearPayload )
00220     return true;
00221 
00222   // Check whether the remote revisions differ
00223   if ( storedItem.remoteRevision() != newItem.remoteRevision() )
00224     return true;
00225 
00226   // Check whether the flags differ
00227   if ( storedItem.flags() != newItem.flags() ) {
00228     kDebug() << "Stored flags "  << storedItem.flags()
00229              << "new flags " << newItem.flags();
00230     return true;
00231   }
00232 
00233   // Check whether the new item contains unknown parts
00234   QSet<QByteArray> missingParts = newItem.loadedPayloadParts();
00235   missingParts.subtract( storedItem.loadedPayloadParts() );
00236   if ( !missingParts.isEmpty() )
00237     return true;
00238 
00239   // ### FIXME SLOW!!!
00240   // If the available part identifiers don't differ, check
00241   // whether the content of the payload differs
00242   if ( newItem.hasPayload()
00243     && storedItem.payloadData() != newItem.payloadData() )
00244     return true;
00245 
00246   // check if remote attributes have been changed
00247   foreach ( Attribute* attr, newItem.attributes() ) {
00248     if ( !storedItem.hasAttribute( attr->type() ) )
00249       return true;
00250     if ( attr->serialized() != storedItem.attribute( attr->type() )->serialized() )
00251       return true;
00252   }
00253 
00254   return false;
00255 }
00256 
00257 void ItemSync::Private::slotLocalListDone( KJob * job )
00258 {
00259   if ( !job->error() ) {
00260     const Item::List list = static_cast<ItemFetchJob*>( job )->items();
00261     foreach ( const Item &item, list ) {
00262       if ( item.remoteId().isEmpty() )
00263         continue;
00264       mLocalItemsById.insert( item.id(), item );
00265       mLocalItemsByRemoteId.insert( item.remoteId(), item );
00266       mUnprocessedLocalItems.insert( item );
00267     }
00268   }
00269 
00270   mLocalListDone = true;
00271   execute();
00272 }
00273 
00274 void ItemSync::Private::execute()
00275 {
00276   if ( !mLocalListDone )
00277     return;
00278 
00279   // early exit to avoid unnecessary TransactionSequence creation in MultipleTransactions mode
00280   // TODO: do the transaction handling in a nicer way instead, only creating TransactionSequences when really needed
00281   if ( !mDeliveryDone && mRemoteItems.isEmpty() )
00282     return;
00283 
00284   if ( (mTransactionMode == SingleTransaction && !mCurrentTransaction) || mTransactionMode == MultipleTransactions) {
00285     ++mTransactionJobs;
00286     mCurrentTransaction = new TransactionSequence( q );
00287     mCurrentTransaction->setAutomaticCommittingEnabled( false );
00288     connect( mCurrentTransaction, SIGNAL(result(KJob*)), q, SLOT(slotTransactionResult(KJob*)) );
00289   }
00290 
00291   processItems();
00292   if ( !mDeliveryDone ) {
00293     if ( mTransactionMode == MultipleTransactions && mCurrentTransaction ) {
00294       mCurrentTransaction->commit();
00295       mCurrentTransaction = 0;
00296     }
00297     return;
00298   }
00299 
00300   // removed
00301   if ( !mIncremental ) {
00302     mRemovedRemoteItems = mUnprocessedLocalItems.toList();
00303     mUnprocessedLocalItems.clear();
00304   }
00305 
00306   deleteItems( mRemovedRemoteItems );
00307   mLocalItemsById.clear();
00308   mLocalItemsByRemoteId.clear();
00309   mRemovedRemoteItems.clear();
00310 
00311   if ( mCurrentTransaction ) {
00312     mCurrentTransaction->commit();
00313     mCurrentTransaction = 0;
00314   }
00315 
00316   checkDone();
00317 }
00318 
00319 void ItemSync::Private::processItems()
00320 {
00321   // added / updated
00322   foreach ( Item remoteItem, mRemoteItems ) { //krazy:exclude=foreach non-const is needed here
00323 #ifndef NDEBUG
00324     if ( remoteItem.remoteId().isEmpty() ) {
00325       kWarning() << "Item " << remoteItem.id() << " does not have a remote identifier";
00326     }
00327 #endif
00328 
00329     Item localItem = mLocalItemsById.value( remoteItem.id() );
00330     if ( !localItem.isValid() )
00331       localItem = mLocalItemsByRemoteId.value( remoteItem.remoteId() );
00332     mUnprocessedLocalItems.remove( localItem );
00333     // missing locally
00334     if ( !localItem.isValid() ) {
00335       createLocalItem( remoteItem );
00336       continue;
00337     }
00338 
00339     if ( q->updateItem( localItem, remoteItem ) ) {
00340       mPendingJobs++;
00341 
00342       remoteItem.setId( localItem.id() );
00343       remoteItem.setRevision( localItem.revision() );
00344       remoteItem.setSize( localItem.size() );
00345       remoteItem.setRemoteId( localItem.remoteId() );  // in case someone clears remoteId by accident
00346       ItemModifyJob *mod = new ItemModifyJob( remoteItem, subjobParent() );
00347       mod->disableRevisionCheck();
00348       q->connect( mod, SIGNAL(result(KJob*)), q, SLOT(slotLocalChangeDone(KJob*)) );
00349     } else {
00350       mProgress++;
00351     }
00352   }
00353   mRemoteItems.clear();
00354 }
00355 
00356 void ItemSync::Private::deleteItems( const Item::List &items )
00357 {
00358   // if in error state, better not change anything anymore
00359   if ( q->error() )
00360     return;
00361 
00362   Item::List itemsToDelete;
00363   foreach ( const Item &item, items ) {
00364     Item delItem( item );
00365     if ( !item.isValid() ) {
00366       delItem = mLocalItemsByRemoteId.value( item.remoteId() );
00367     }
00368 
00369     if ( !delItem.isValid() ) {
00370 #ifndef NDEBUG
00371       kWarning() << "Delete item (remoteeId=" << item.remoteId()
00372                  << "mimeType=" << item.mimeType()
00373                  << ") does not have a valid UID and no item with that remote ID exists either";
00374 #endif
00375       continue;
00376     }
00377 
00378     if ( delItem.remoteId().isEmpty() ) {
00379       // don't attempt to remove items that never were written to the backend
00380       continue;
00381     }
00382 
00383     itemsToDelete.append ( delItem );
00384   }
00385 
00386   if ( !itemsToDelete.isEmpty() ) {
00387     mPendingJobs++;
00388     ItemDeleteJob *job = new ItemDeleteJob( itemsToDelete, subjobParent() );
00389     q->connect( job, SIGNAL(result(KJob*)), q, SLOT(slotLocalDeleteDone(KJob*)) );
00390 
00391     // It can happen that the groupware servers report us deleted items
00392     // twice, in this case this item delete job will fail on the second try.
00393     // To avoid a rollback of the complete transaction we gracefully allow the job
00394     // to fail :)
00395     TransactionSequence *transaction = qobject_cast<TransactionSequence*>( subjobParent() );
00396     if ( transaction )
00397       transaction->setIgnoreJobFailure( job );
00398   }
00399 }
00400 
00401 void ItemSync::Private::slotLocalDeleteDone( KJob* )
00402 {
00403   mPendingJobs--;
00404   mProgress++;
00405 
00406   checkDone();
00407 }
00408 
00409 void ItemSync::Private::slotLocalChangeDone( KJob * job )
00410 {
00411   Q_UNUSED( job );
00412   mPendingJobs--;
00413   mProgress++;
00414 
00415   checkDone();
00416 }
00417 
00418 void ItemSync::Private::slotTransactionResult( KJob *job )
00419 {
00420   --mTransactionJobs;
00421   if ( mCurrentTransaction == job )
00422     mCurrentTransaction = 0;
00423 
00424   checkDone();
00425 }
00426 
00427 Job * ItemSync::Private::subjobParent() const
00428 {
00429   if ( mCurrentTransaction && mTransactionMode != NoTransaction )
00430     return mCurrentTransaction;
00431   return q;
00432 }
00433 
00434 void ItemSync::setStreamingEnabled(bool enable)
00435 {
00436   d->mStreaming = enable;
00437 }
00438 
00439 void ItemSync::deliveryDone()
00440 {
00441   Q_ASSERT( d->mStreaming );
00442   d->mDeliveryDone = true;
00443   d->execute();
00444 }
00445 
00446 void ItemSync::slotResult(KJob* job)
00447 {
00448   if ( job->error() ) {
00449     // pretent there were no errors
00450     Akonadi::Job::removeSubjob( job );
00451     // propagate the first error we got but continue, we might still be fed with stuff from a resource
00452     if ( !error() ) {
00453       setError( job->error() );
00454       setErrorText( job->errorText() );
00455     }
00456   } else {
00457     Akonadi::Job::slotResult( job );
00458   }
00459 }
00460 
00461 void ItemSync::rollback()
00462 {
00463   setError( UserCanceled );
00464   if ( d->mCurrentTransaction )
00465     d->mCurrentTransaction->rollback();
00466   d->mDeliveryDone = true; // user wont deliver more data
00467   d->execute(); // end this in an ordered way, since we have an error set no real change will be done
00468 }
00469 
00470 void ItemSync::setTransactionMode(ItemSync::TransactionMode mode)
00471 {
00472   d->mTransactionMode = mode;
00473 }
00474 
00475 
00476 #include "itemsync.moc"
This file is part of the KDE documentation.
Documentation copyright © 1996-2012 The KDE developers.
Generated on Mon May 14 2012 04:52:57 by doxygen 1.7.5 written by Dimitri van Heesch, © 1997-2006

KDE's Doxygen guidelines are available online.

akonadi

Skip menu "akonadi"
  • Main Page
  • Namespace List
  • Namespace Members
  • Alphabetical List
  • Class List
  • Class Hierarchy
  • Class Members
  • File List
  • Modules
  • Related Pages

kdepimlibs-4.8.3 API Reference

Skip menu "kdepimlibs-4.8.3 API Reference"
  • akonadi
  •   contact
  •   kmime
  • kabc
  • kalarmcal
  • kblog
  • kcal
  • kcalcore
  • kcalutils
  • kholidays
  • kimap
  • kioslave
  •   imap4
  •   mbox
  •   nntp
  • kldap
  • kmbox
  • kmime
  • kontactinterface
  • kpimidentities
  • kpimtextedit
  •   richtextbuilders
  • kpimutils
  • kresources
  • ktnef
  • kxmlrpcclient
  • mailtransport
  • microblog
  • qgpgme
  • syndication
  •   atom
  •   rdf
  •   rss2
Report problems with this website to our bug tracking system.
Contact the specific authors with questions and comments about the page contents.

KDE® and the K Desktop Environment® logo are registered trademarks of KDE e.V. | Legal