-
-
Save oxlade39/5661598 to your computer and use it in GitHub Desktop.
private Observable<EsperEvent> sendDisconnectOnUnsubscribe(final Observable<EsperEvent> responseForThisRequest, | |
final CreateEventStreamRequest createEventStreamRequest) { | |
return Observable.create(new Func1<Observer<EsperEvent>, Subscription>() { | |
@Override | |
public Subscription call(final Observer<EsperEvent> esperEventObserver) { | |
final Subscription wrapped = responseForThisRequest.subscribe(new Action1<EsperEvent>() { | |
@Override | |
public void call(EsperEvent esperEvent) { | |
esperEventObserver.onNext(esperEvent); | |
} | |
}); | |
return Subscriptions.create(wrapped, sendUnsubscribeMessage()); | |
} | |
private Subscription sendUnsubscribeMessage() { | |
return new Subscription() { | |
@Override | |
public void unsubscribe() { | |
final String requestId = createEventStreamRequest.getId(); | |
final EndEventStreamRequest disconnectEvent = new EndEventStreamRequest(requestId); | |
endRequestSender.sendBodyAndHeader(disconnectEvent, Exchange.CORRELATION_ID, requestId); | |
} | |
}; | |
} | |
}); | |
} |
Everything is composable so you can return a Subscription
implementation that does whatever you want, as you are doing here.
The only thing to be aware of is that unsubscribe
is not guaranteed to occur whereas one of onError
or onCompleted
is.
If you want this to occur only on unsubscribe
then it's the correct solution. If you want it to always occur then you may want to use the finallyDo
operator instead as it acts like a try/finally and will get invoked after completion regardless of how it completes. If this is an infinite sequence then unsubscribe
may very well be the right place, though it would not be trigged in event of an error from the infinite sequence.
As far as an existing helper, for your use case it seems you are probably pursuing the correct approach. You could use a CompositeSubscription
if you wanted (http://netflix.github.io/RxJava/javadoc/rx/subscriptions/CompositeSubscription.html) and have 2 Subscriptions
combined into one, one of them being the real one, another being your custom logic. The decorator pattern you're pursuing though feels more correct for what you're doing.
Ok thanks, I had found the CompositeSubscription
, I've updated the gist accordingly.
I guess this is an infinite sequence, as the termination of the sequence only occurs when the client un-subscribes.
I think maybe I'll call sendUnsubscribeMessage
from onError
and onCompleted
as well then just to be correct.
Thanks for your help.
I want to perform some custom callback when a client subscribes to my existing observable.
I can't think how to achieve other than the above but I'm sure I'm missing some existing helper.????
Effectively I just want to return an existing Ovservable with a decorated Subscription