GetBagelConnectionUseCase
operator fun invoke(): Flowable<List<ConnectionHolder>> {
return Flowables.zip(
subscriptionRepository.getActiveSubscription(),
likesYouMatchCountRepository.getLocalCount()
)
.firstOrError()
.map { pair ->
val result = mutableListOf<ConnectionHolder>()
/*
Decide if we need to show the LIKE YOU upsell in the chat list.
Active subscription that has the likes you benefit.
*/
val hasActiveSubscription = pair.first.isPresent
&& pair.first.get().hasBenefit(BenefitKeys.LIKES_YOU)
val likesYouCount = pair.second
if (!hasActiveSubscription && likesYouCount > 1) {
result.add(
ConnectionHolder(
likesYouCount = likesYouCount,
type = ConnectionHolder.ConnectionHolderType.UPSELL_LIKES_YOU
)
)
}
// TODO to wrap these DB operations with Single.fromCallable and change the parent map to flatMap
val activeConnections = bagelConnectionRepository.getActiveConnections() // <-- needs to wait for Bagels/ Couples to be downloaded
result.addAll(activeConnections)
val expiredConnections = bagelConnectionRepository.getExpiredConnections() // <-- needs to wait for Bagels/ Couples to be downloaded
if (expiredConnections.isNotEmpty()) {
result.add(ConnectionHolder(type = ConnectionHolder.ConnectionHolderType.INACTIVE_HEADER))
result.addAll(expiredConnections)
}
result
}
.toFlowable() // <-- add this line to convert Single -> Flowable
.flatMap { connectionHolders ->
/*
The Connection Holders are missing names, photos and last messages
Go to Room to fetch
1. Names & photos using ProfileRepositoryV2
2. Last messages using ConnectionDetailsDao
*/
val profileIds_new = connectionHolders.mapNotNull { it.match?.profileId }
// TODO: remove this logic, as it limits the number so we dont overflow
val profileIds = profileIds_new.stream().limit(950).collect(Collectors.toList())
//android.util.Log.e("CTP","113 profiles" + profileIds.size)
profileRepositoryV2.getProfilesWithPhotos(profileIds) // <-- needs to return Flowable
.flatMap { listProfilePhotos ->
val profileMap = listProfilePhotos.associateBy { it.first.id }
connectionDetailsDao.getConnectionDetails(profileIds) // <-- needs to return Flowable
.map { connectionDetails ->
/*
We never clear the bagels in the DB so there's a chance
the connection list grows too large
It would make no sense to query thousands of connections
and try to fit them into memory would it?
*/
if (connectionDetails.size > 500 || connectionHolders.size > 500) {
val message = "Too many connections - connection details: ${connectionDetails.size} - active + expired bagels: ${connectionHolders.size}"
Logger.e(TAG, message, IllegalStateException(message))
}
val connectionDetailMap = connectionDetails.associateBy { it.profileId }
// fill-in name, photo and last message for the connection holders
val newConnectionHolders = connectionHolders.map { connectionHolder ->
if (connectionHolder.match?.profileId != null) {
val profileId = connectionHolder.match.profileId
connectionHolder.copy(
firstName = profileMap[profileId]?.first?.firstName,
imageUrl = profileMap[profileId]?.second?.firstOrNull()?.url,
connectionDetails = connectionDetailMap[profileId]
)
} else {
// The headers
connectionHolder
}
}
newConnectionHolders
.sortedBy { connectionHolder -> connectionHolder.type }
.sortedWith(lastMessageComparator)
}
}
}
}
ChatListFragment
updateChatListAsyncWork = bagelManager.getSuggestedMemCache()
.map(bagels -> {
Logger.d(TAG, "there are " + bagels.size() + " bagels in memory");
if (bagels.size()>0) {
Logger.d(TAG, "probably just downloaded " + bagels.size() + " bagels");
}
return bagels.size();
})
.distinctUntilChanged()
.toFlowable(BackpressureStrategy.ERROR)
.doOnSubscribe(subscription -> showLoading(true))
.flatMap((Function<Integer, Publisher<List<ConnectionHolder>>>) numBagels -> {
if (numBagels > 0) {
return getBagelConnectionUseCase.invoke();
} else {
return Flowable.just(Collections.emptyList());
}
})
.observeOn(AndroidSchedulers.mainThread())
.as(AutoDispose.autoDisposable(this))
.subscribe(connections -> {
showLoading(false);
if (isAdded()) {
showConnections(connections);
myConnectionsAdapter.setData(connections);
myConnectionsAdapter.notifyDataSetChanged();
}
}, throwable -> {
showLoading(false);
Logger.e(TAG, "error fetch connections", throwable);
});