|
import { Injectable } from '@angular/core'; |
|
import { Observable } from 'rxjs/Observable'; |
|
import { Subject } from 'rxjs/Subject'; |
|
import 'rxjs/add/operator/filter'; |
|
import 'rxjs/add/operator/map'; |
|
|
|
/** |
|
* **Note**: The internal structure of the message queue. |
|
*/ |
|
export interface IMessage { |
|
|
|
/** |
|
* The name of the channel (see at decorator "Channel(...)) |
|
*/ |
|
channel: string; |
|
|
|
/** |
|
* The message object. |
|
*/ |
|
data: any; |
|
} |
|
|
|
export type NewableClass = { new (...args: any[]): any }; |
|
|
|
/** |
|
* The decorator is for the message classes. |
|
* |
|
* Every message class required this annotation. |
|
* |
|
* **Example** |
|
* |
|
* ```js |
|
* @Channel('rrmc.work') |
|
* export class WorkMessage { |
|
* constructor(public readonly data: boolean) {} |
|
* } |
|
* ``` |
|
* @param {string} name |
|
* @return {(target: any) => void} |
|
* @constructor |
|
*/ |
|
export function Channel(name: string) { |
|
return (target: any) => { |
|
target.channel = name; |
|
// console.log('Channel -> %s:', target.channel); |
|
}; |
|
} |
|
|
|
@Injectable() |
|
export class MessagingService { |
|
|
|
private _message$: Subject<IMessage> = new Subject<IMessage>(); |
|
|
|
/** |
|
* Send an message. |
|
* |
|
* **Note**: The message is sending synchronized |
|
* |
|
* **Note**: Every message must have the decorator `@RRMCChannel` |
|
* |
|
* ```ts |
|
* this.messaging.send(new AddFeatureCode(this.feature.code)); |
|
* ``` |
|
* @param {any} message |
|
*/ |
|
send<T>(message: T): void { |
|
const channel = (<any>message.constructor).channel; |
|
if (!channel) { |
|
console.debug('>> WARNING: Send message not possible, because missing channel!'); |
|
return; |
|
} |
|
this._message$.next({ |
|
channel, |
|
data: message |
|
}); |
|
} |
|
|
|
/** |
|
* Send an message. |
|
* |
|
* **Note**: The message is sending asynchronous |
|
* |
|
* **Note**: Every message must have the decorator `@RRMCChannel` |
|
* |
|
* ```ts |
|
* this.messaging.sendAsync(new AddFeatureCode(this.feature.code), 200); // 200ms |
|
* ``` |
|
* @param {any} message |
|
* @param {number} [timeout] the timeout in the unit milli seconds |
|
*/ |
|
sendAsync<T>(message: T, timeout: number = 10): void { |
|
setTimeout(() => { |
|
const channel = (<any>message.constructor).channel; |
|
if (!channel) { |
|
console.debug('>> WARNING: Send message not possible, because missing channel!'); |
|
return; |
|
} |
|
this._message$.next({ |
|
channel, |
|
data: message |
|
}); |
|
}, Math.abs(timeout)); |
|
} |
|
|
|
/** |
|
* Interested in the message of the given type. If an message from the type is triggered, |
|
* the observable is fire the data value from the type. |
|
* |
|
* ```ts |
|
* const s: Subscription = this.messaging.of(AddFeatureMessage) |
|
* .subscribe((featureCode: AddFeatureMessage) => { |
|
* console.log('add feature code: %s', featureCode.code); |
|
* }); |
|
* |
|
* |
|
* // in the ngOnDestroy() method must unsubscribe the subscription. |
|
* ``` |
|
* |
|
* **Note**: Every subscription must unsubscribe in the destroy method. |
|
* |
|
* @param {NewableClass} messageType the given message type |
|
* @return {Observable<T>} the observable for this type. |
|
*/ |
|
of<T>(messageType: NewableClass): Observable<T> { |
|
const channel = (<any>messageType).channel; |
|
if (!channel) { |
|
console.debug('>> WARNING: receive message not possible, because missing channel!'); |
|
return; |
|
} |
|
// console.log('>> Filter for channel -> %s', channel); |
|
return this._message$ |
|
.filter((message: IMessage) => message.channel === channel) |
|
.map((message: IMessage) => { |
|
// console.log('>> Data for channel (%s) ->', channel, message.data); |
|
return message.data |
|
}); |
|
} |
|
} |