diff --git a/CesiumAsync/include/CesiumAsync/SharedAssetDepot.h b/CesiumAsync/include/CesiumAsync/SharedAssetDepot.h index 2ddd21d9a..e5e7e87b0 100644 --- a/CesiumAsync/include/CesiumAsync/SharedAssetDepot.h +++ b/CesiumAsync/include/CesiumAsync/SharedAssetDepot.h @@ -76,39 +76,9 @@ class CESIUMASYNC_API SharedAssetDepot * @param factory The factory to use to fetch and create assets that don't * already exist in the depot. See \ref FactorySignature. */ - SharedAssetDepot(std::function factory) - : _assets(), - _assetsByPointer(), - _deletionCandidates(), - _totalDeletionCandidateMemoryUsage(0), - _mutex(), - _factory(std::move(factory)), - _pKeepAlive(nullptr) {} - - virtual ~SharedAssetDepot() { - // Ideally, when the depot is destroyed, all the assets it owns would become - // independent assets. But this is extremely difficult to manage in a - // thread-safe manner. - - // Since we're in the destructor, we can be sure no one has a reference to - // this instance anymore. That means that no other thread can be executing - // `getOrCreate`, and no async asset creations are in progress. - - // However, if assets owned by this depot are still alive, then other - // threads can still be calling addReference / releaseReference on some of - // our assets even while we're running the depot's destructor. Which means - // that we can end up in `markDeletionCandidate` at the same time the - // destructor is running. And in fact it's possible for a `SharedAsset` with - // especially poor timing to call into a `SharedAssetDepot` just after it is - // destroyed. - - // To avoid this, we use the _pKeepAlive field to maintain an artificial - // reference to this depot whenever it owns live assets. This should keep - // this destructor from being called except when all of its assets are also - // in the _deletionCandidates list. - - CESIUM_ASSERT(this->_assets.size() == this->_deletionCandidates.size()); - } + SharedAssetDepot(std::function factory); + + virtual ~SharedAssetDepot(); /** * @brief Gets an asset from the depot if it already exists, or creates it @@ -123,135 +93,31 @@ class CESIUMASYNC_API SharedAssetDepot SharedFuture> getOrCreate( const AsyncSystem& asyncSystem, const std::shared_ptr& pAssetAccessor, - const TAssetKey& assetKey) { - // We need to take care here to avoid two assets starting to load before the - // first asset has added an entry and set its maybePendingAsset field. - std::unique_lock lock(this->_mutex); - - auto existingIt = this->_assets.find(assetKey); - if (existingIt != this->_assets.end()) { - // We've already loaded (or are loading) an asset with this ID - we can - // just use that. - const AssetEntry& entry = *existingIt->second; - if (entry.maybePendingAsset) { - // Asset is currently loading. - return *entry.maybePendingAsset; - } else { - return asyncSystem.createResolvedFuture(entry.toResultUnderLock()) - .share(); - } - } - - // Calling the factory function while holding the mutex unnecessarily - // limits parallelism. It can even lead to a bug in the scenario where the - // `thenInWorkerThread` continuation is invoked immediately in the current - // thread, before `thenInWorkerThread` itself returns. That would result - // in an attempt to lock the mutex recursively, which is not allowed. - - // So we jump through some hoops here to publish "this thread is working - // on it", then unlock the mutex, and _then_ actually call the factory - // function. - Promise promise = asyncSystem.createPromise(); - - // We haven't loaded or started to load this asset yet. - // Let's do that now. - CesiumUtility::IntrusivePointer> - pDepot = this; - CesiumUtility::IntrusivePointer pEntry = - new AssetEntry(assetKey); - - auto future = - promise.getFuture() - .thenImmediately([pDepot, pEntry, asyncSystem, pAssetAccessor]() { - return pDepot->_factory(asyncSystem, pAssetAccessor, pEntry->key); - }) - .catchImmediately([](std::exception&& e) { - return CesiumUtility::Result< - CesiumUtility::IntrusivePointer>( - CesiumUtility::ErrorList::error( - std::string("Error creating asset: ") + e.what())); - }) - .thenInWorkerThread( - [pDepot, pEntry]( - CesiumUtility::Result< - CesiumUtility::IntrusivePointer>&& result) { - std::lock_guard lock(pDepot->_mutex); - - if (result.pValue) { - result.pValue->_pDepot = pDepot.get(); - pDepot->_assetsByPointer[result.pValue.get()] = - pEntry.get(); - } - - // Now that this asset is owned by the depot, we exclusively - // control its lifetime with a std::unique_ptr. - pEntry->pAsset = - std::unique_ptr(result.pValue.get()); - pEntry->errorsAndWarnings = std::move(result.errors); - pEntry->maybePendingAsset.reset(); - - // The asset is initially live because we have an - // IntrusivePointer to it right here. So make sure the depot - // stays alive, too. - pDepot->_pKeepAlive = pDepot; - - return pEntry->toResultUnderLock(); - }); - - SharedFuture> sharedFuture = - std::move(future).share(); - - pEntry->maybePendingAsset = sharedFuture; - - [[maybe_unused]] bool added = - this->_assets.emplace(assetKey, pEntry).second; - - // Should always be added successfully, because we checked above that the - // asset key doesn't exist in the map yet. - CESIUM_ASSERT(added); - - // Unlock the mutex and then call the factory function. - lock.unlock(); - promise.resolve(); - - return sharedFuture; - } + const TAssetKey& assetKey); /** * @brief Returns the total number of distinct assets contained in this depot, * including both active and inactive assets. */ - size_t getAssetCount() const { - std::lock_guard lock(this->_mutex); - return this->_assets.size(); - } + size_t getAssetCount() const; /** * @brief Gets the number of assets owned by this depot that are active, * meaning that they are currently being used in one or more places. */ - size_t getActiveAssetCount() const { - std::lock_guard lock(this->_mutex); - return this->_assets.size() - this->_deletionCandidates.size(); - } + size_t getActiveAssetCount() const; /** * @brief Gets the number of assets owned by this depot that are inactive, * meaning that they are not currently being used. */ - size_t getInactiveAssetCount() const { - std::lock_guard lock(this->_mutex); - return this->_deletionCandidates.size(); - } + size_t getInactiveAssetCount() const; /** * @brief Gets the total bytes used by inactive (unused) assets owned by this * depot. */ - int64_t getInactiveAssetTotalSizeBytes() const { - std::lock_guard lock(this->_mutex); - return this->_totalDeletionCandidateMemoryUsage; - } + int64_t getInactiveAssetTotalSizeBytes() const; private: // Disable copy @@ -266,61 +132,9 @@ class CESIUMASYNC_API SharedAssetDepot * depot lock; otherwise, false. */ void markDeletionCandidate(const TAssetType& asset, bool threadOwnsDepotLock) - override { - if (threadOwnsDepotLock) { - this->markDeletionCandidateUnderLock(asset); - } else { - std::lock_guard lock(this->_mutex); - this->markDeletionCandidateUnderLock(asset); - } - } - - void markDeletionCandidateUnderLock(const TAssetType& asset) { - auto it = this->_assetsByPointer.find(const_cast(&asset)); - CESIUM_ASSERT(it != this->_assetsByPointer.end()); - if (it == this->_assetsByPointer.end()) { - return; - } - - CESIUM_ASSERT(it->second != nullptr); - - AssetEntry& entry = *it->second; - entry.sizeInDeletionList = asset.getSizeBytes(); - this->_totalDeletionCandidateMemoryUsage += entry.sizeInDeletionList; + override; - this->_deletionCandidates.insertAtTail(entry); - - if (this->_totalDeletionCandidateMemoryUsage > - this->inactiveAssetSizeLimitBytes) { - // Delete the deletion candidates until we're below the limit. - while (this->_deletionCandidates.size() > 0 && - this->_totalDeletionCandidateMemoryUsage > - this->inactiveAssetSizeLimitBytes) { - AssetEntry* pOldEntry = this->_deletionCandidates.head(); - this->_deletionCandidates.remove(*pOldEntry); - - this->_totalDeletionCandidateMemoryUsage -= - pOldEntry->sizeInDeletionList; - - CESIUM_ASSERT( - pOldEntry->pAsset == nullptr || - pOldEntry->pAsset->_referenceCount == 0); - - if (pOldEntry->pAsset) { - this->_assetsByPointer.erase(pOldEntry->pAsset.get()); - } - - // This will actually delete the asset. - this->_assets.erase(pOldEntry->key); - } - } - - // If this depot is not managing any live assets, then we no longer need to - // keep it alive. - if (this->_assets.size() == this->_deletionCandidates.size()) { - this->_pKeepAlive.reset(); - } - } + void markDeletionCandidateUnderLock(const TAssetType& asset); /** * @brief Unmarks the given asset as a candidate for deletion. @@ -332,37 +146,9 @@ class CESIUMASYNC_API SharedAssetDepot */ void unmarkDeletionCandidate( const TAssetType& asset, - bool threadOwnsDepotLock) override { - if (threadOwnsDepotLock) { - this->unmarkDeletionCandidateUnderLock(asset); - } else { - std::lock_guard lock(this->_mutex); - this->unmarkDeletionCandidateUnderLock(asset); - } - } - - void unmarkDeletionCandidateUnderLock(const TAssetType& asset) { - auto it = this->_assetsByPointer.find(const_cast(&asset)); - CESIUM_ASSERT(it != this->_assetsByPointer.end()); - if (it == this->_assetsByPointer.end()) { - return; - } - - CESIUM_ASSERT(it->second != nullptr); - - AssetEntry& entry = *it->second; - bool isFound = this->_deletionCandidates.contains(entry); - - CESIUM_ASSERT(isFound); - - if (isFound) { - this->_totalDeletionCandidateMemoryUsage -= entry.sizeInDeletionList; - this->_deletionCandidates.remove(entry); - } + bool threadOwnsDepotLock) override; - // This depot is now managing at least one live asset, so keep it alive. - this->_pKeepAlive = this; - } + void unmarkDeletionCandidateUnderLock(const TAssetType& asset); /** * @brief An entry for an asset owned by this depot. This is reference counted @@ -468,4 +254,260 @@ class CESIUMASYNC_API SharedAssetDepot _pKeepAlive; }; +template +SharedAssetDepot::SharedAssetDepot( + std::function factory) + : _assets(), + _assetsByPointer(), + _deletionCandidates(), + _totalDeletionCandidateMemoryUsage(0), + _mutex(), + _factory(std::move(factory)), + _pKeepAlive(nullptr) {} + +template +SharedAssetDepot::~SharedAssetDepot() { + // Ideally, when the depot is destroyed, all the assets it owns would become + // independent assets. But this is extremely difficult to manage in a + // thread-safe manner. + + // Since we're in the destructor, we can be sure no one has a reference to + // this instance anymore. That means that no other thread can be executing + // `getOrCreate`, and no async asset creations are in progress. + + // However, if assets owned by this depot are still alive, then other + // threads can still be calling addReference / releaseReference on some of + // our assets even while we're running the depot's destructor. Which means + // that we can end up in `markDeletionCandidate` at the same time the + // destructor is running. And in fact it's possible for a `SharedAsset` with + // especially poor timing to call into a `SharedAssetDepot` just after it is + // destroyed. + + // To avoid this, we use the _pKeepAlive field to maintain an artificial + // reference to this depot whenever it owns live assets. This should keep + // this destructor from being called except when all of its assets are also + // in the _deletionCandidates list. + + CESIUM_ASSERT(this->_assets.size() == this->_deletionCandidates.size()); +} + +template +SharedFuture> +SharedAssetDepot::getOrCreate( + const AsyncSystem& asyncSystem, + const std::shared_ptr& pAssetAccessor, + const TAssetKey& assetKey) { + // We need to take care here to avoid two assets starting to load before the + // first asset has added an entry and set its maybePendingAsset field. + std::unique_lock lock(this->_mutex); + + auto existingIt = this->_assets.find(assetKey); + if (existingIt != this->_assets.end()) { + // We've already loaded (or are loading) an asset with this ID - we can + // just use that. + const AssetEntry& entry = *existingIt->second; + if (entry.maybePendingAsset) { + // Asset is currently loading. + return *entry.maybePendingAsset; + } else { + return asyncSystem.createResolvedFuture(entry.toResultUnderLock()) + .share(); + } + } + + // Calling the factory function while holding the mutex unnecessarily + // limits parallelism. It can even lead to a bug in the scenario where the + // `thenInWorkerThread` continuation is invoked immediately in the current + // thread, before `thenInWorkerThread` itself returns. That would result + // in an attempt to lock the mutex recursively, which is not allowed. + + // So we jump through some hoops here to publish "this thread is working + // on it", then unlock the mutex, and _then_ actually call the factory + // function. + Promise promise = asyncSystem.createPromise(); + + // We haven't loaded or started to load this asset yet. + // Let's do that now. + CesiumUtility::IntrusivePointer> + pDepot = this; + CesiumUtility::IntrusivePointer pEntry = new AssetEntry(assetKey); + + auto future = + promise.getFuture() + .thenImmediately([pDepot, pEntry, asyncSystem, pAssetAccessor]() { + return pDepot->_factory(asyncSystem, pAssetAccessor, pEntry->key); + }) + .catchImmediately([](std::exception&& e) { + return CesiumUtility::Result< + CesiumUtility::IntrusivePointer>( + CesiumUtility::ErrorList::error( + std::string("Error creating asset: ") + e.what())); + }) + .thenInWorkerThread( + [pDepot, + pEntry](CesiumUtility::Result< + CesiumUtility::IntrusivePointer>&& result) { + std::lock_guard lock(pDepot->_mutex); + + if (result.pValue) { + result.pValue->_pDepot = pDepot.get(); + pDepot->_assetsByPointer[result.pValue.get()] = pEntry.get(); + } + + // Now that this asset is owned by the depot, we exclusively + // control its lifetime with a std::unique_ptr. + pEntry->pAsset = + std::unique_ptr(result.pValue.get()); + pEntry->errorsAndWarnings = std::move(result.errors); + pEntry->maybePendingAsset.reset(); + + // The asset is initially live because we have an + // IntrusivePointer to it right here. So make sure the depot + // stays alive, too. + pDepot->_pKeepAlive = pDepot; + + return pEntry->toResultUnderLock(); + }); + + SharedFuture> sharedFuture = + std::move(future).share(); + + pEntry->maybePendingAsset = sharedFuture; + + [[maybe_unused]] bool added = this->_assets.emplace(assetKey, pEntry).second; + + // Should always be added successfully, because we checked above that the + // asset key doesn't exist in the map yet. + CESIUM_ASSERT(added); + + // Unlock the mutex and then call the factory function. + lock.unlock(); + promise.resolve(); + + return sharedFuture; +} + +template +size_t SharedAssetDepot::getAssetCount() const { + std::lock_guard lock(this->_mutex); + return this->_assets.size(); +} + +template +size_t SharedAssetDepot::getActiveAssetCount() const { + std::lock_guard lock(this->_mutex); + return this->_assets.size() - this->_deletionCandidates.size(); +} + +template +size_t SharedAssetDepot::getInactiveAssetCount() const { + std::lock_guard lock(this->_mutex); + return this->_deletionCandidates.size(); +} + +template +int64_t +SharedAssetDepot::getInactiveAssetTotalSizeBytes() + const { + std::lock_guard lock(this->_mutex); + return this->_totalDeletionCandidateMemoryUsage; +} + +template +void SharedAssetDepot::markDeletionCandidate( + const TAssetType& asset, + bool threadOwnsDepotLock) { + if (threadOwnsDepotLock) { + this->markDeletionCandidateUnderLock(asset); + } else { + std::lock_guard lock(this->_mutex); + this->markDeletionCandidateUnderLock(asset); + } +} + +template +void SharedAssetDepot::markDeletionCandidateUnderLock( + const TAssetType& asset) { + auto it = this->_assetsByPointer.find(const_cast(&asset)); + CESIUM_ASSERT(it != this->_assetsByPointer.end()); + if (it == this->_assetsByPointer.end()) { + return; + } + + CESIUM_ASSERT(it->second != nullptr); + + AssetEntry& entry = *it->second; + entry.sizeInDeletionList = asset.getSizeBytes(); + this->_totalDeletionCandidateMemoryUsage += entry.sizeInDeletionList; + + this->_deletionCandidates.insertAtTail(entry); + + if (this->_totalDeletionCandidateMemoryUsage > + this->inactiveAssetSizeLimitBytes) { + // Delete the deletion candidates until we're below the limit. + while (this->_deletionCandidates.size() > 0 && + this->_totalDeletionCandidateMemoryUsage > + this->inactiveAssetSizeLimitBytes) { + AssetEntry* pOldEntry = this->_deletionCandidates.head(); + this->_deletionCandidates.remove(*pOldEntry); + + this->_totalDeletionCandidateMemoryUsage -= pOldEntry->sizeInDeletionList; + + CESIUM_ASSERT( + pOldEntry->pAsset == nullptr || + pOldEntry->pAsset->_referenceCount == 0); + + if (pOldEntry->pAsset) { + this->_assetsByPointer.erase(pOldEntry->pAsset.get()); + } + + // This will actually delete the asset. + this->_assets.erase(pOldEntry->key); + } + } + + // If this depot is not managing any live assets, then we no longer need to + // keep it alive. + if (this->_assets.size() == this->_deletionCandidates.size()) { + this->_pKeepAlive.reset(); + } +} + +template +void SharedAssetDepot::unmarkDeletionCandidate( + const TAssetType& asset, + bool threadOwnsDepotLock) { + if (threadOwnsDepotLock) { + this->unmarkDeletionCandidateUnderLock(asset); + } else { + std::lock_guard lock(this->_mutex); + this->unmarkDeletionCandidateUnderLock(asset); + } +} + +template +void SharedAssetDepot::unmarkDeletionCandidateUnderLock( + const TAssetType& asset) { + auto it = this->_assetsByPointer.find(const_cast(&asset)); + CESIUM_ASSERT(it != this->_assetsByPointer.end()); + if (it == this->_assetsByPointer.end()) { + return; + } + + CESIUM_ASSERT(it->second != nullptr); + + AssetEntry& entry = *it->second; + bool isFound = this->_deletionCandidates.contains(entry); + + CESIUM_ASSERT(isFound); + + if (isFound) { + this->_totalDeletionCandidateMemoryUsage -= entry.sizeInDeletionList; + this->_deletionCandidates.remove(entry); + } + + // This depot is now managing at least one live asset, so keep it alive. + this->_pKeepAlive = this; +} + } // namespace CesiumAsync