10.1 可观察对象和RxJS

在Angular中,可以使用可观察对象作为数据架构的骨架来构建应用。使用可观察对象构造数据被称为响应式编程(reactive programming)。

可观察对象和响应式编程究竟是什么呢?响应式编程是一种处理异步数据流的编程方法。可观察对象是用来实现响应式编程的主要数据结构。必须承认,这些术语可能不怎么明确。因此,我们会在本章通过具体的例子来帮助你更好地理解这些概念。

10.1.1 注意:一些必备的RxJS相关知识

需要指出的是,本书的重点不是讲解响应式编程。有一些其他不错的资源可以教会你响应式编程的基础,你应该阅读它们。我们在下面列举了几个。

你可以将本章视为如何使用RxJS和Angular的入门教程,而不是RxJS和响应式编程的详细指南。

本章会详细解释我们接触到的RxJS概念和API,但如果RxJS对你来说还是个新鲜事物,那么你可能需要通过其他相关资源来补充知识。

本章使用Underscore.js

Underscore.js是一个流行的类库,为Array和Object这样的JavaScript数据结构提供函数式操作符。本章将在使用RxJS的同时大量使用它。如果在代码中看见了_,比如_.map或者_.sortBy,要知道这就是在使用Underscore.js类库。要查阅Underscore.js文档,请阅读http://underscorejs.org/。

10.1.2 学习响应式编程和RxJS

如果你只想学习RxJS,推荐阅读这篇文章。

●Andre Staltz的“你不容错过的响应式编程入门”(https://gist.github.com/staltz/868e7e9bc2a7b8c1f754)

在你了解一些RxJS背后的概念之后,下面的链接可以帮你在前进的道路上走得更远。

●“哪些静态操作符可以用来创建流?”(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/which-static.md)

●“哪些实例操作符可以在流上使用?”(https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/which-instance.md)

●RxMarbles:各种流操作的交互式图解(http://staltz.com/rxmarbles)

本章由始至终都会提供RxJS的API文档链接。RxJS文档有大量很棒的示例代码,阐明了不同的流和操作符是如何工作的。

Angular必须要用RxJS吗?

不,完全不必。可观察对象只是Angular众多数据模式中的一种。想了解其他数据模式,请参见第9章。

我想给你提个醒:起初学习RxJS时会有一些烧脑。但是相信我,你终将掌握它的要领,并且这些付出都是值得的。下面是一些关于流的重要概念,会对你有所帮助。

(1)承诺(promise)发出单个值,而流发出多个值。在应用中,流扮演着和承诺一样的角色。如果你是从回调函数转为承诺的话,会发现相对于回调函数,承诺在可读性和数据可维护性方面都有了很大的改进。同样,流也改进了承诺,可以在流上持续响应数据的变化(与此相反,承诺是一次性解决)。

(2)命令式代码“拉取”数据,而响应式流“推送”数据。在响应式编程中,代码订阅了数据变化时接收通知,流会把数据“推送”给这些订阅者。

(3)RxJS是函数式的。如果你热衷于像map、reduce和filter这样的函数式操作符,那么使用RxJS时会感到很轻松;因为在某种意义上讲,数据集合和强大的函数操作符同样适用于流。

(4)流是可组合的。可以把流想象成一个贯穿数据的操作管道。你可以订阅流中的任何部分,甚至可以组合它们来创建新的流。

10.2 聊天应用概览

在本章中,我们将使用RxJS构建聊天应用。界面截图如图10-1所示。

图10-1 完成后的聊天应用

我们通常会尝试在书中展现每一行代码。不过这个聊天应用有大量的活动部件,所以本章不会展现所有代码。

可以在文件夹code/rxjs/chat中找到本章的示例代码。在适当的时候,我们会告诉你在哪里可以找到你想要查看的内容。

本应用提供了几个机器人,你可以和它们聊天。先运行这些代码看看:

 cd code/rxjs/chat

 npm install

 npm run go

然后在浏览器中打开http://localhost:8080。

 如果上面的链接无法打开,请尝试这个链接:http://localhost:8080/webpack-dev-server/index.html。

 一些Windows用户在这个目录下运行npm install时可能会遇到问题。如果遇到了,请先确保自己是在Cygwin中运行这些命令行。

你在本应用中要注意以下几点:

●你可以点击会话(thread)和一个机器人聊天;

●机器人会根据各自的性格来回复你的消息;

●右上角的未读消息总数会自动同步。

下面来看看本应用是如何构造的。我们有:

●三个顶层Angular组件

●三个数据模型

●三个服务

让我们来逐个看看。

10.2.1 组件

将页面分解成三个顶层组件,如图10-2所示。

图10-2 聊天应用的顶层组件

●ChatNavBar:包含未读消息数。

●ChatThreads:展示一个可点击的会话列表,每个会话都包含最新消息和会话头像。

●ChatWindow:展示当前会话的消息和一个用来发送新消息的输入框。

10.2.2 数据模型

本应用同样包含三个数据模型,如图10-3所示。

●User:存储聊天参与者的相关信息。

●Message:存储一条单独的信息。

●Thread:存储一组消息的集合以及一些与这次会话有关的其他数据。

图10-3 聊天应用的数据模型

10.2.3 服务

在本应用中,每个数据模型都有其对应的服务。服务都是单例对象,有以下两个作用:

(1)提供应用可以订阅的数据流;

(2)提供操作符来添加或更改数据。

比如,UserService:

●发布一个流用来通知当前用户;

●提供一个setCurrentUser函数,用于设置当前用户(即从currentUser流发出当前用户)。

10.2.4 总结

大体上来说,本应用的数据架构很简明:

服务负责维护流,而流负责发出数据模型(例如Message);

组件订阅这些流并按照最新的值进行渲染。

比如,ChatThreads组件订阅ThreadService中的流来获取最新的会话列表,而ChatWindow组件订阅ThreadService中的流来获取最新的消息列表。

本章其余部分将深入探讨如何使用Angular和RxJS来实现此应用。我们首先实现数据模型,然后看看如何创建服务来管理流,最后实现组件。

10.3 实现数据模型

我们先从简单的部分开始,看看数据模型。

10.3.1 User

User类很简明,有id、name和avatarSrc三个属性。

code/rxjs/chat/app/ts/models.ts

export class User {

 id:string;

 constructor(public name:string,

       public avatarSrc:string){

  this.id = uuid();

 }

}

 注意上面的代码,我们在构造函数中使用了TypeScript的简写方式。当指明public name:string时,我们是在告诉TypeScript:(1)将name作为类的一个公有属性;(2)当创建一个新的实例时,把参数的值赋给这个属性。

10.3.2 Thread

同样,Thread也是一个简单的TypeScript类。

code/rxjs/chat/app/ts/models.ts

export class Thread {

 id:string;

 lastMessage:Message;

 name:string;

 avatarSrc:string;

 constructor(id?:string,

       name?:string,

       avatarSrc?:string){

  this.id = id || uuid();

  this.name = name;

  this.avatarSrc = avatarSrc;

 }

}

注意,我们在Thread类中保存了一个lastMessage的引用。这可以使我们在会话列表中显示最新消息。

10.3.3 Message

同样,Message也是个简单的TypeScript类,但是这里使用了一个形式略微不同的构造函数。

code/rxjs/chat/app/ts/models.ts

 lastMessage:Message;

构造函数中的这种模式允许我们使用构造函数中的关键字参数进行模拟。使用这种模式,可以使用任意的数据来创建一个新的Message,而且不用担心参数的顺序问题。比如,我们可以这样做:

 let msg1 = new Message();

 # or this

 let msg2 = new Message({

  text:"Hello Nate Murray!"

 })

看完了数据模型,我们再来看看第一个服务:UserService。

10.4 实现UserService

UserService的意义在于提供这样一个场所:应用可以在这里了解到当前用户信息,并在当前用户发生变化时通知应用的其他部件。

我们要做的第一件事是创建一个TypeScript类,并为它加上@Injectable注解。

code/rxjs/chat/app/ts/services/UserService.ts

export class UserService {

 // `currentUser` contains the current user

 currentUser:Subject<User> = new BehaviorSubject<User>(null);

 public setCurrentUser(newUser:User):void {

  this.currentUser.next(newUser);

 }

}

我们说这个服务是可注入的,意思是可以把它注入到应用中的其他组件中。简要来说,依赖注入有两大优点:

(1)让Angular来管理对象的生命周期;

(2)测试组件时更容易。

我们在第8章中深入讨论了它。如果你还没有阅读第8章,现在只需要知道可以把它注入到我们的组件中就可以了,代码如下:

  class MyComponent {

   constructor(public userService:UserService){

    // do something with `userService` here

   }

  }

10.4.1 currentUser流

接下来设置一个流,用来管理当前用户。

code/rxjs/chat/app/ts/services/UserService.ts

 currentUser:Subject<User> = new BehaviorSubject<User>(null);

这里发生了很多事,我们来逐一分解:

●定义了实例变量currentUser,它是一个Subject流;

●更准确地说,currentUser是一个包含User的BehaviorSubject;

●然而,这个流的初始值是null(构造函数参数)。

如果你没怎么用过RxJS的话,那么可能不知道Subject和BehaviorSubject是什么。你可以把Subject当作一个“读/写”流。

 从技术上来说,Subject同时了继承Observable和Observer

因为消息是即时发送的,所以新的订阅者会有丢失流中最新值的风险。这是流的一个副作用,而BehaviourSubject弥补了这一点。

BehaviourSubject有一个特殊的属性,用来存储最新的值。这意味着任何流的订阅者都会接收最新的值。这对于我们来说好极了,因为这意味着应用的任何部分都可以订阅UserService.currentUser流并且可以立即知道当前用户是谁。

10.4.2 设置新用户

当前用户改变时(例如登录),我们需要一个途径将新用户发布到流中。

有两种暴露API的方法可以做到这件事。

直接将新用户添加到流中

更新当前用户的最直接方法就是使用UserService的实例直接发布一个新的User对象到流中,如下所示。

userService.subscribe((newUser)=> {

 console.log('New User is:', newUser.name);

})

// => New User is:originalUserName

let u = new User('Nate', 'anImgSrc');

userService.currentUser.next(u);

// => New User is:Nate

 注意,这里使用了Subject的next方法来推送一个新值到流中。

这种做法的好处是可以复用流中现有的API,不需要引入任何新的代码或者API。

创建setCurrentUser(newUser:User)方法

另一种更新当前用户的方法是在UserService上创建一个辅助方法,如下所示。

code/rxjs/chat/app/ts/services/UserService.ts

 public setCurrentUser(newUser:User):void {

  this.currentUser.next(newUser);

 }

你会注意到我们仍然在使用currentUser流的next方法。为何还要这样做呢?

这样做的价值在于,currentUser的实现与流的实现进行了解耦。通过把next方法包裹在setCurrentUser方法里,我们有一定的空间来更改UserService的实现而不至于破坏实例。

在这个例子中,我不会强烈推荐其中某一种方法,但两种方法在大型项目中的可维护性上还是有显著区别的。

 第三种选项是把这些更改暴露为它们自己的流(也就是说我们把更改当前用户的这个“动作”放进流中)。我们会在下面的MessagesService中探讨这种模式。

10.4.3 UserService.ts

把所有代码整合起来,可以得到UserService的完整代码。

code/rxjs/chat/app/ts/services/UserService.ts

import {Injectable} from '@angular/core';

import {Subject, BehaviorSubject} from 'rxjs';

import {User} from '../models';

/**

* UserService manages our current user

*/

@Injectable()

export class UserService {

 // `currentUser` contains the current user

 currentUser:Subject<User> = new BehaviorSubject<User>(null);

 public setCurrentUser(newUser:User):void {

  this.currentUser.next(newUser);

 }

}

export var userServiceInjectables:Array<any> = [

 UserService

];

10.5 MessagesService

MessagesService是这个应用的支柱。此应用中的所有消息都要流经MessagesService。

相比于UserService,MessagesService包含一些更复杂的流,它由五个流组成:三个数据管理流和两个动作流。

三个数据管理流分别是:

●newMessages,发出每条新Message并且每条只发出一次;

●messages,发出一组当前的Messages;

●updates,在messages流上执行操作。

10.5.1 newMessages流

newMessages是一个Subject,用来发出每条新Message并且每条只发出一次。

code/rxjs/chat/app/ts/services/MessagesService.ts

export class MessagesService {

 // a stream that publishes new messages only once

 newMessages:Subject<Message> = new Subject<Message>();

我们还可以定义一个辅助方法来添加Message到这个流中。

code/rxjs/chat/app/ts/services/MessagesService.ts

 addMessage(message:Message):void {

  this.newMessages.next(message);

 }

有这样的一个流还是很有帮助的,它可以从一个会话中获取不属于某个特殊用户的所有消息。以回声机器人(Echo Bot)为例,如图10-4所示。

图10-4 回声机器人

当实现回声机器人时,我们不想进入一个重复机器人本身消息的死循环。

要实现这一点,我们可以订阅newMessages流并根据下面的条件过滤所有消息:

(1)是这个会话的一部分;

(2)不是机器人产生的。

你可以这样理解,对于一个给定的Thread,我们想要一个不包含这个User的消息流。

code/rxjs/chat/app/ts/services/MessagesService.ts

 messagesForThreadUser(thread:Thread, user:User):Observable<Message> {

  return this.newMessages

   .filter((message:Message)=> {

        // belongs to this thread

    return(message.thread.id === thread.id)&&

        // and isn't authored by this user

       (message.author.id!== user.id);

   });

 }

messagesForThreadUser接收一个Thread对象和一个User对象并返回一个经过筛选的新Message流。筛选条件是消息属于这个Thread,而且不是由这个User写的。也就是说,这是一个在此Thread中的其他人的消息流。

10.5.2 messages流

newMessages流发出单个的Message对象,而messages流发出一组最新的Message对象

code/rxjs/chat/app/ts/services/MessagesService.ts

 messages:Observable<Message[]>;

 类型Message[]等同于Array<Message>。另一种等价的写法是Observable<Array<Message>>。当定义messages流的类型为Observable<Message[]>时,表示这个流发出的是一个数组(Message对象的数组),而不是单个的Messages。

那么messages是如何填充的呢?为此我们需要讨论updates流和一种新的模式:操作流。

10.5.3 操作流模式

下面是操作流模式的基本理念:

●在messages流中维护状态,它会保存一个最新的Message数组;

●使用一个updates流,即应用于messages流的函数流

你可以这样理解:任何updates流上的函数都会更改当前的消息列表。updates流上的函数应该接收一个Message对象列表然后返回一个Message对象列表。让我们在代码中通过创建一个接口来使这个概念形式化。

code/rxjs/chat/app/ts/services/MessagesService.ts

interface IMessagesOperation extends Function {

 (messages:Message[]):Message[];

}

下面来定义updates流。

code/rxjs/chat/app/ts/services/MessagesService.ts

 // `updates` receives _operations_ to be applied to our `messages`

 // it's a way we can perform changes on *all* messages(that are currently

 // stored in `messages`)

 updates:Subject<any> = new Subject<any>();

记住,updates流接收用来应用到消息列表的操作。但是如何把这些关联起来呢?实现方法如下(在MessagesService的constructor中)。

code/rxjs/chat/app/ts/services/MessagesService.ts

 constructor(){

  this.messages = this.updates

   // watch the updates and accumulate operations on the messages

   .scan((messages:Message[],

       operation:IMessagesOperation)=> {

        return operation(messages);

       },

       initialMessages)

   // make sure we can share the most recent list of messages across anyone

这段代码引入了新的流函数:scan。如果你熟悉函数式编程的话,scan很像reduce:它为输入流中的每个元素运行函数并累加出一个值。scan的特别之处在于,它会把每个中间过程中计算出的结果值发送出去。也就是说,它不会等到流全部完成后再发送结果值;这正是我们想要的。

当调用this.updates.scan时,我们会创建一个新的流。这个流订阅了updates流。scan内部执行的每一次,我们都会得到:

(1)经过累加的messages流;

(2)将要应用的新operation。

然后返回新的Message[]。

10.5.4 共享流

关于流,你需要知道的一点是它们默认是不可共享的。也就是说,如果一个订阅者从流中读取了一个值,读完后这个值就永远消失了。在这个例子中,我们想:(1)在一些订阅者之间共享同样的流;(2)为任何未来的订阅者重播最新的值。

要做到这点,我们使用操作符publishReplay和refCount。

●publishReplay可以让我们在多个订阅者之间共享同一个订阅,并为未来的订阅者重播n个最新的值。(参见publish和replay

●refCount通过对可观察对象何时发出值进行管理,使publish方法的返回值用起来更加方便。

等等,refCount到底是干什么的?

refCount可能有一些不太好理解,因为它涉及一个如何管理“热”的可观察对象和“冷”的可观察对象。我们不打算深入讲解它的工作原理,读者可自行阅读相关文档。

●关于refCount的RxJS文档:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/refcount.md

●“Rx介绍:‘热’的可观察对象和‘冷’的可观察对象”:http://www.introtorx.com/Content/v1.0.10621.0/14_HotAndColdObservables.html#RefCount

●refCount弹珠图解:http://reactivex.io/documentation/operators/refcount.html

code/rxjs/chat/app/ts/services/MessagesService.ts

   // watch the updates and accumulate operations on the messages

   .scan((messages:Message[],

       operation:IMessagesOperation)=> {

        return operation(messages);

       },

      initialMessages)

   // make sure we can share the most recent list of messages across anyone

   // who's interested in subscribing and cache the last known list of

   // messages

   .publishReplay(1)

   .refCount();

10.5.5 把Message对象添加到messages流中

现在我们可以把一个Message对象添加到messages流中,如下所示:

var myMessage = new Message(/* params here…… */);

updates.next((messages:Message[]):Message[] => {

 return messages.concat(myMessage);

})

我们添加了一个操作到updates流中。因为messages流订阅了updates流,所以它会应用这个操作,而操作会使用concat把我们的newMessage合并到累加的messages列表之中。

 如果这里需要花费你一些时间来仔细思考,也没有关系。要是你不习惯这种编程风格的话,是会感觉有些陌生。

上面的方法有一个问题,那就是它使用起来有些繁琐。要是不用每次都写这种内部函数就好了。我们可以像下面这样做:

addMessage(newMessage:Message){

 updates.next((messages:Message[]):Message[] => {

  return messages.concat(newMessage);

 })

}

// somewhere else

var myMessage = new Message(/* params here…… */);

MessagesService.addMessage(myMessage);

现在好一些了,但它还不是“响应式的方式”。这是因为这种创建消息的行为不能和其他流组合。(该方法也绕过了newMessage流。稍后将进行更详细的讨论。)

创建新消息的响应式做法是用一个流来接收Message对象并把它添加到消息列表中。再次声明,如果你还没有习惯这种思维方式的话,那么这对于你来说会有些陌生。下面介绍实现它的方法。

首先,我们创建一个叫作create的动作流。(动作流这个术语只是用来描述它在服务中的角色。这个流本身只是一个普通的Subject。)

code/rxjs/chat/app/ts/services/MessagesService.ts

 // action streams

 create:Subject<Message> = new Subject<Message>();

接下来,我们在构造函数中配置了create流。

code/rxjs/chat/app/ts/services/MessagesService.ts

  this.create

   .map(function(message:Message):IMessagesOperation {

    return(messages:Message[])=> {

     return messages.concat(message);

    };

   })

map操作符和JavaScript中内置的Array.map很像,只不过它是在流上的工作。也就是说,它为流中的每一项运行函数并发出函数的返回值。

在这个例子中,我们的意思是“对于我们接收并作为输入的每个Message对象来说,都返回IMessagesOperation,它会把这个消息添加到消息列表中”。换句话说,这个流会发出一个函数,这个函数接收Message对象的列表并把这个Message对象添加到消息列表中。

现在有了create流,还有一件事要做:实际上,我们需要把create流连接到updates流。我们使用subscribe来完成。

code/rxjs/chat/app/ts/services/MessagesService.ts

  this.create

   .map(function(message:Message):IMessagesOperation {

    return(messages:Message[])=> {

     return messages.concat(message);

    };

   })

   .subscribe(this.updates);

我们在这里所做的就是订阅updates流来监听create流。这表示,如果create流接收了一个Message对象,那么它会发出一个IMessagesOperation;updates流会接收这个IMessagesOperation,然后把Message对象添加到messages流中。

图10-5展现了当前的情况。

图10-5 从create流开始创建新消息

这很棒!因为它意味着我们:

(1)从messages流中获取了当前消息列表;

(2)获得了在当前消息列表上进行操作的一种方式(通过updates流);

(3)通过一个简单易用的流把创建操作放在了updates流上(通过create流)。

不论在代码的什么地方,只要想获取最新消息列表,就必须要用messages流。但是还有一个问题,我们还没有把这个流程和newMessages流关联起来

如果有一种方式可以轻松地把这个流和任何newMessages流发出的Message关联起来,那就太好了。事实证明这很容易。

code/rxjs/chat/app/ts/services/MessagesService.ts

  this.newMessages

   .subscribe(this.create);

现在的情况如图10-6所示。

图10-6 从newMessages流开始创建新消息

现在的流程完整了!这也是两全其美的:我们能够通过订阅newMessages来获取单条消息;而如果只想要最新的消息列表,我们可以订阅messages流。

这里需要指出这个设计的一些影响:如果你直接订阅了newMessages流,必须要注意变化可能发生在下游。这里有三点需要考虑。

第一,显然不会有任何下游的更新应用于Message。

第二,在这个案例中,我们的Message对象是可变的。如果你订阅newMessages流并保存了Message的引用,那么这个Message的属性可能会产生变化。

第三,如果想利用Message的可变性,你可能无法做到。考虑这种情况:我们可以在updates流队列上增加一个操作,此操作复制每个Message然后改变这个副本。(与我们现在的做法相比,这应该是更好的设计。)在这个例子中,你不能依赖任何从newMessages流直接发出的Message,因为它们是可以改变的。

尽管如此,只要你记住这些注意事项,就应该不会有太大麻烦。

10.5.6 完整的MessagesService

完整的MessagesService代码如下。

code/rxjs/chat/app/ts/services/MessagesService.ts

import {Injectable} from '@angular/core';

import {Subject, Observable} from 'rxjs';

import {User, Thread, Message} from '../models';

let initialMessages:Message[] = [];

interface IMessagesOperation extends Function {

 (messages:Message[]):Message[];

}

@Injectable()

export class MessagesService {

 // a stream that publishes new messages only once

 newMessages:Subject<Message> = new Subject<Message>();

 // `messages` is a stream that emits an array of the most up to date messages

 messages:Observable<Message[]>;

 // `updates` receives _operations_ to be applied to our `messages`

 // it's a way we can perform changes on *all* messages(that are currently

 // stored in `messages`)

 updates:Subject<any> = new Subject<any>();

 // action streams

 create:Subject<Message> = new Subject<Message>();

 markThreadAsRead:Subject<any> = new Subject<any>();

 constructor(){

  this.messages = this.updates

   // watch the updates and accumulate operations on the messages

   .scan((messages:Message[],

       operation:IMessagesOperation)=> {

        return operation(messages);

       },

      initialMessages)

   // make sure we can share the most recent list of messages across anyone

   // who's interested in subscribing and cache the last known list of

   // messages

   .publishReplay(1)

   .refCount();

   // `create` takes a Message and then puts an operation(the inner function)

   // on the `updates` stream to add the Message to the list of messages.

   //

   // That is, for each item that gets added to `create`(by using `next`)

   // this stream emits a concat operation function.

   //

   // Next we subscribe `this.updates` to listen to this stream, which means

   // that it will receive each operation that is created

   //

   // Note that it would be perfectly acceptable to simply modify the

   // "addMessage" function below to simply add the inner operation function to

   // the update stream directly and get rid of this extra action stream

   // entirely.The pros are that it is potentially clearer.The cons are that

   // the stream is no longer composable.

   this.create

    .map(function(message:Message):IMessagesOperation {

     return(messages:Message[])=> {

      return messages.concat(message);

     };

    })

    .subscribe(this.updates);

   this.newMessages

    .subscribe(this.create);

   // similarly, `markThreadAsRead` takes a Thread and then puts an operation

   // on the `updates` stream to mark the Messages as read

   this.markThreadAsRead

    .map((thread:Thread)=> {

     return(messages:Message[])=> {

      return messages.map((message:Message)=> {

       // note that we're manipulating `message` directly here.Mutability

       // can be confusing and there are lots of reasons why you might want

       // to, say, copy the Message object or some other 'immutable' here

       if(message.thread.id === thread.id){

        message.isRead = true;

       }

       return message;

      });

     };

    })

    .subscribe(this.updates);

  }

  // an imperative function call to this action stream

  addMessage(message:Message):void {

   this.newMessages.next(message);

  }

  messagesForThreadUser(thread:Thread, user:User):Observable<Message> {

   return this.newMessages

    .filter((message:Message)=> {

         // belongs to this thread

     return(message.thread.id === thread.id)&&

         // and isn't authored by this user

        (message.author.id!== user.id);

    });

 }

}

export var messagesServiceInjectables:Array<any> = [

 MessagesService

];

10.5.7 试用MessagesService

如果你还没有完全理解,那么现在是个打开代码并随意尝试MessagesService的好时机,来感受一下它是如何运作的。在test/services/MessagesService.spec.ts中有一个示例,可以直接拿来使用。

要运行这个项目的测试,可以打开终端,然后输入以下代码:

cd /path/to/code/rxjs/chat // <—— your path will vary

npm install

karma start

首先创建一些数据模型的实例。

code/rxjs/chat/test/services/MessagesService.spec.ts

import {MessagesService} from '../../app/ts/services/services';

import {Message, User, Thread} from '../../app/ts/models';

describe('MessagesService',()=> {

 it('should test',()=> {

  let user:User = new User('Nate', '');

  let thread:Thread = new Thread('t1', 'Nate', '');

  let m1:Message = new Message({

   author:user,

   text:'Hi!',

   thread:thread

  });

  let m2:Message = new Message({

   author:user,

   text:'Bye!',

   thread:thread

  });

接下来,订阅几个流。

code/rxjs/chat/test/services/MessagesService.spec.ts

  let messagesService:MessagesService = new MessagesService();

  // listen to each message indivdually as it comes in

  messagesService.newMessages

   .subscribe((message:Message)=> {

    console.log('=> newMessages:' + message.text);

   });

  // listen to the stream of most current messages

  messagesService.messages

   .subscribe((messages:Message[])=> {

    console.log('=> messages:' + messages.length);

   });

  messagesService.addMessage(m1);

  messagesService.addMessage(m2);

  // => messages:1

  // => newMessages:Hi!

  // => messages:2

  // => newMessages:Bye!

 });

});

注意,尽管我们先订阅了newMessages并且newMessages是通过addMessage方法直接调用的,但是messages流的订阅先输出了日志。原因就是messages流订阅newMessages流早于测试代码中的订阅(当MessagesService实例化时)。(你不应该依赖于代码中单独的流的顺序,但是它为什么以这种方式运行是值得思考的。)

尝试使用MessagesService并感受一下这些流是如何工作的。我们将在下节中使用它们来构建ThreadsService。

10.6 ThreadsService

在ThreadsService中将定义四个流,它们分别发出:

(1)当前一组Thread的映射(threads流);

(2)按时间逆序排列的Thread列表(orderedthreads流);

(3)当前已选的Thread(currentThread流);

(4)当前已选Thread的Message列表(currentThreadMessages流)。

下面来讨论如何构建这里的每一个流。在这个过程中,我们还将学习更多关于RxJS的知识。

10.6.1 当前一组Thread的映射(threads流)

我们先来定义ThreadsService类和用来发出Thread的实例变量。

code/rxjs/chat/app/ts/services/ThreadsService.ts

import {Injectable} from '@angular/core';

import {Subject, BehaviorSubject, Observable} from 'rxjs';

import {Thread, Message} from '../models';

import {MessagesService} from './MessagesService';

import * as _ from 'underscore';

@Injectable()

export class ThreadsService {

 // `threads` is a observable that contains the most up to date list of threads

 threads:Observable<{ [key:string]:Thread }>;

注意,这个流会发出一个映射(即一个对象),将Thread的id作为string键,Thread本身作为值。

要创建一个用来维护当前会话列表的流,我们先附加到messagesService.messages流。

code/rxjs/chat/app/ts/services/ThreadsService.ts

 threads:Observable<{ [key:string]:Thread }>;

回忆一下,每次把一个新的Message对象添加到流时,messages流都会发出一个当前Message对象的数组。我们要查看每个Message对象并返回唯一的Threads列表。

code/rxjs/chat/app/ts/services/ThreadsService.ts

  this.threads = messagesService.messages

   .map((messages:Message[])=> {

    let threads:{[key:string]:Thread} = {};

    // Store the message's thread in our accumulator `threads`

    messages.map((message:Message)=> {

     threads[message.thread.id] = threads[message.thread.id] ||

      message.thread;

注意,每次都会创建一个新的threads列表。这样做的原因是,我们可能会彻底删除一些消息(例如离开对话)。因为每次我们都重新计算会话列表,所以自然而然地“删除”了没有消息的会话。

在会话列表中,我们想通过使用Thread中的最新Message来显示聊天预览。

图10-7 带有聊天预览功能的会话列表

要做到这一点,我们在每个Thread中都保存了最新的Message。通过比较sentAt时间就可以知道哪个Message是最新的。

code/rxjs/chat/app/ts/services/ThreadsService.ts

    // Cache the most recent message for each thread

    let messagesThread:Thread = threads[message.thread.id];

    if(!messagesThread.lastMessage ||

      messagesThread.lastMessage.sentAt < message.sentAt){

     messagesThread.lastMessage = message;

    }

   });

   return threads;

  });

把所有代码整合起来,threads流看起来如下所示。

code/rxjs/chat/app/ts/services/ThreadsService.ts

  this.threads = messagesService.messages

   .map((messages:Message[])=> {

    let threads:{[key:string]:Thread} = {};

    // Store the message's thread in our accumulator `threads`

    messages.map((message:Message)=> {

     threads[message.thread.id] = threads[message.thread.id] ||

      message.thread;

    // Cache the most recent message for each thread

    let messagesThread:Thread = threads[message.thread.id];

    if(!messagesThread.lastMessage ||

      messagesThread.lastMessage.sentAt < message.sentAt){

     messagesThread.lastMessage = message;

    }

   });

   return threads;

  });

试用ThreadsService

我们来试试ThreadsService。首先创建一些要用的数据模型。

code/rxjs/chat/test/services/ThreadsService.spec.ts

import {MessagesService, ThreadsService} from '../../app/ts/services/services';

import {Message, User, Thread} from '../../app/ts/models';

import * as _ from 'underscore';

describe('ThreadsService',()=> {

 it('should collect the Threads from Messages',()=> {

  let nate:User = new User('Nate Murray', '');

  let felipe:User = new User('Felipe Coury', '');

  let t1:Thread = new Thread('t1', 'Thread 1', '');

  let t2:Thread = new Thread('t2', 'Thread 2', '');

  let m1:Message = new Message({

   author:nate,

   text:'Hi!',

   thread:t1

  });

  let m2:Message = new Message({

   author:felipe,

   text:'Where did you get that hat?',

   thread:t1

  });

  let m3:Message = new Message({

   author:nate,

   text:'Did you bring the briefcase?',

   thread:t2

  });

创建服务的一个实例。

code/rxjs/chat/test/services/ThreadsService.spec.ts

  let messagesService:MessagesService = new MessagesService();

  let threadsService:ThreadsService = new ThreadsService(messagesService);

 注意,这里把messagesService作为参数传给了ThreadsService的构造函数。我们通常让依赖注入系统来处理这些,但在测试中可以自己提供依赖关系。

我们订阅threads流并把通过流的内容打印出来。

code/rxjs/chat/test/services/ThreadsService.spec.ts

  let threadsService:ThreadsService = new ThreadsService(messagesService);

  threadsService.threads

   .subscribe((threadIdx:{ [key:string]:Thread })=> {

    let threads:Thread[] = _.values(threadIdx);

    let threadNames:string = _.map(threads,(t:Thread)=> t.name)

                  .join(', ');

    console.log(`=> threads(${threads.length}):${threadNames} `);

   });

  messagesService.addMessage(m1);

  messagesService.addMessage(m2);

  messagesService.addMessage(m3);

  // => threads(1):Thread 1

  // => threads(1):Thread 1

  // => threads(2):Thread 1, Thread 2

 });

});

10.6.2 按时间逆序排列的Thread列表(orderedthreads流)

threads流给了我们一个映射,作为会话列表的一个“索引”。但是我们想让会话视图根据最新消息的时间来排序,如图10-8所示。

图10-8 按时间逆序排列的会话

创建一个新的流,它返回一个按最新Message时间排序的Thread数组。

我们首先定义orderedThreads并把它作为一个实例属性。

code/rxjs/chat/app/ts/services/ThreadsService.ts

 // `orderedThreads` contains a newest-first chronological list of threads

 orderedThreads:Observable<Thread[]>;

接下来,在constructor中通过订阅threads流并按最新消息时间排序定义orderedThreads。

code/rxjs/chat/app/ts/services/ThreadsService.ts

  this.orderedThreads = this.threads

   .map((threadGroups:{ [key:string]:Thread })=> {

    let threads:Thread[] = _.values(threadGroups);

    return _.sortBy(threads,(t:Thread)=> t.lastMessage.sentAt).reverse();

   });

10.6.3 当前已选的Thread(currentThread流)

我们的应用需要知道当前已选的Thread是哪个。这让我们知道:

(1)哪个会话应该在消息窗口显示;

(2)会话列表中的哪个会话应该被标记为当前会话(如图10-9所示)。

图10-9 使用·符号表示当前会话

创建一个BehaviorSubject并把它保存为currentThread流。

code/rxjs/chat/app/ts/services/ThreadsService.ts

 // `currentThread` contains the currently selected thread

 currentThread:Subject<Thread> =

  new BehaviorSubject<Thread>(new Thread());

注意,这里分配了一个空的Thread作为默认值。我们不再需要对currentThread进行更多配置了。

设置当前会话

要设置当前会话,currentThread流可以选择下面的其中一个方法:

(1)直接通过next方法提交新会话;

(2)添加一个辅助函数提交新会话。

我们定义一个辅助函数setCurrentThread,可以使用它来设置下一个会话。

code/rxjs/chat/app/ts/services/ThreadsService.ts

 setCurrentThread(newThread:Thread):void {

  this.currentThread.next(newThread);

 }

标记当前会话为已读

我们想要记录未读消息数量。如果切换到一个新Thread,要把那个Thread中的所有Message都标记为已读。我们拥有做到这些所需的工具:

(1)messagesService.makeThreadAsRead接收一个Thread,然后把这个Thread中的所有Message都标记为已读;

(2)currentThread流发出单个的Thread,它代表当前Thread。

要做的就是把它们关联起来。

code/rxjs/chat/app/ts/services/ThreadsService.ts

 this.currentThread.subscribe(this.messagesService.markThreadAsRead);

10.6.4 当前已选Thread的Message列表(currentThreadMessages流)

现在有了当前已选会话,需要确保显示这个Thread的Message列表(如图10-10所示)。

图10-10 当前消息列表来自反转机器人(Reverse Bot)

它的实现比表面上看起来要复杂一些。我们这样来实现它:

var theCurrentThread:Thread;

this.currentThread.subscribe((thread:Thread)=> {

 theCurrentThread = thread;

})

this.currentThreadMessages.map(

 (mesages:Message[])=> {

  return _.filter(messages,

   (message:Message)=> {

    return message.thread.id == theCurrentThread.id;

   })

 })

这种方法有什么问题?如果currentThread改变了,而currentThreadMessages完全不知道,那么currentThreadMessages就是一个过时了的消息列表!

如果颠倒一下呢?在一个变量中保存当前消息列表,然后订阅currentThread流的变化,会发生什么呢?还是会有同样的问题,只是这次我们知道会话变化,但是不知道有新消息进来。

如何解决这个问题呢?

原来,RxJS有一组操作符用来合并多个流。在这个例子中,我们想说的是“如果currentThread和messagesService.messages中的任何一个改变了,那么就要发出一些东西”。为此,我们使用combineLatest操作符

code/rxjs/chat/app/ts/services/ThreadsService.ts

  this.currentThreadMessages = this.currentThread

   .combineLatest(messagesService.messages,

          (currentThread:Thread, messages:Message[])=> {

当合并两个流时,会有一个先到达,不能保证在两个流上都有值,所以需要检查以确保有我们所需要的;否则就会返回一个空列表。

现在有了当前会话和消息列表,就可以过滤出我们想要的消息了。

code/rxjs/chat/app/ts/services/ThreadsService.ts

  this.currentThreadMessages = this.currentThread

   .combineLatest(messagesService.messages,

          (currentThread:Thread, messages:Message[])=> {

    if(currentThread && messages.length > 0){

     return _.chain(messages)

      .filter((message:Message)=>

          (message.thread.id === currentThread.id))

还有一个细节:既然我们已经找到了当前会话的消息,把这些消息标记为已读就是很方便的。

code/rxjs/chat/app/ts/services/ThreadsService.ts

     return _.chain(messages)

      .filter((message:Message)=>

          (message.thread.id === currentThread.id))

      .map((message:Message)=> {

       message.isRead = true;

       return message; })

      .value();

 关于是否应该在这里把消息标记为已读是有争议的。标记为已读的最大缺点就是我们更改了对象本身,而本质上这是一个“只读”会话。也就是说,这是一个有副作用的读操作,一般不应该使用。尽管如此,本应用中的currentThreadMessages流只作用于currentThread流,而currentThread流应始终把它的消息标记为已读。不过,我通常不推荐“有副作用的读操作”模式。

把所有代码整合起来,currentThreadMessages看起来是这样的。

code/rxjs/chat/app/ts/services/ThreadsService.ts

  this.currentThreadMessages = this.currentThread

   .combineLatest(messagesService.messages,

          (currentThread:Thread, messages:Message[])=> {

    if(currentThread && messages.length > 0){

     return _.chain(messages)

      .filter((message:Message)=>

          (message.thread.id === currentThread.id))

      .map((message:Message)=> {

       message.isRead = true;

       return message; })

      .value();

    } else {

     return [];

    }

   });

10.6.5 完整的ThreadsService

ThreadService完整代码如下所示。

code/rxjs/chat/app/ts/services/ThreadsService.ts

import {Injectable} from '@angular/core';

import {Subject, BehaviorSubject, Observable} from 'rxjs';

import {Thread, Message} from '../models';

import {MessagesService} from './MessagesService';

import * as _ from 'underscore';

@Injectable()

export class ThreadsService {

 // `threads` is a observable that contains the most up to date list of threads

 threads:Observable<{ [key:string]:Thread }>;

 // `orderedThreads` contains a newest-first chronological list of threads

 orderedThreads:Observable<Thread[]>;

 // `currentThread` contains the currently selected thread

 currentThread:Subject<Thread> =

  new BehaviorSubject<Thread>(new Thread());

 // `currentThreadMessages` contains the set of messages for the currently

 // selected thread

 currentThreadMessages:Observable<Message[]>;

 constructor(private messagesService:MessagesService){

  this.threads = messagesService.messages

   .map((messages:Message[])=> {

    let threads:{[key:string]:Thread} = {};

    // Store the message's thread in our accumulator `threads`

    messages.map((message:Message)=> {

     threads[message.thread.id] = threads[message.thread.id] ||

      message.thread;

     // Cache the most recent message for each thread

     let messagesThread:Thread = threads[message.thread.id];

     if(!messagesThread.lastMessage ||

       messagesThread.lastMessage.sentAt < message.sentAt){

      messagesThread.lastMessage = message;

     }

    });

    return threads;

   });

  this.orderedThreads = this.threads

   .map((threadGroups:{ [key:string]:Thread })=> {

    let threads:Thread[] = _.values(threadGroups);

    return _.sortBy(threads,(t:Thread)=> t.lastMessage.sentAt).reverse();

   });

  this.currentThreadMessages = this.currentThread

   .combineLatest(messagesService.messages,

          (currentThread:Thread, messages:Message[])=> {

    if(currentThread && messages.length > 0){

     return _.chain(messages)

      .filter((message:Message)=>

          (message.thread.id === currentThread.id))

      .map((message:Message)=> {

       message.isRead = true;

       return message; })

      .value();

    } else {

     return [];

    }

   });

  this.currentThread.subscribe(this.messagesService.markThreadAsRead);

 }

 setCurrentThread(newThread:Thread):void {

  this.currentThread.next(newThread);

 }

}

export var threadsServiceInjectables:Array<any> = [

 ThreadsService

];

10.7 总结

数据模型和服务已经完成!现在,我们拥有了连接到视图组件所需要的一切!在下章中,我们将构建三个重要的组件,用来渲染页面并和本章所创建的流进行交互。

  1. http://underscorejs.org/
  2. https://www.cygwin.com/
  3. 注意,@Injectable注解表示该类可以让Angular把其他服务注入进来,也就是说以该类作为目标。因此 在创建服务时,@Injectable注解并不是必需的,但官方的风格指南明确建议我们加上它。——译者注
  4. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/subject.md
  5. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md
  6. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observer.md
  7. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/behaviorsubject.md
  8. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/scan.md
  9. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/publish.md
  10. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/replay.md
  11. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/refcount.md
  12. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/select.md
  13. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/subscribe.md
  14. https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/combinelatestproto.md