§Streams 迁移指南
Play 2.5 对其流式传输数据和响应主体的方式进行了重大更改。
-
Play 2.5 使用 **Akka Streams 进行流式传输**。以前的 Play 版本使用迭代器进行流式传输,以及其他几种临时类型的流式传输,例如
WebSocket
、Chunks
等。使用 Akka Streams 的更改有两个主要好处。首先,Java 用户现在可以访问 Play 的全部功能集,例如编写主体解析器和过滤器。其次,流式传输库现在在 Play 中更加一致。
-
Play 2.5 使用 **
ByteString
来保存字节包**。以前,Play 使用字节数组 (byte[]
/ArrayByte
) 来保存字节。ByteString
类是不可变的,就像 Java 的String
一样,因此它更安全、更易于使用。与String
一样,它在构造时确实会复制其数据,因此会带来少量性能成本,但这可以通过其廉价的串联和子字符串操作来平衡。 -
Play 2.5 具有用于响应主体的新 **
HttpEntity
类型**。以前,响应主体是字节的简单流。HTTP 主体现在是HttpEntity
的一种类型:Strict
、Streamed
或Chunked
。通过告诉 Play 使用哪种类型的实体,应用程序可以更好地控制 Play 如何发送 HTTP 响应。它还使 Play 更容易优化其传递主体的过程。
§更改摘要
Play API 的以下部分已更新
- 结果 (
Result
主体,chunked
/feed
/stream
方法) - 操作 (
EssentialAction
) - 主体解析 (
BodyParser
) - WebSockets (
WebSocket
) - 服务器发送事件 (
EventSource
)
以下类型已更改
目的 | 旧类型 | 新类型 |
---|---|---|
保存字节 | byte[] /Array[Byte] |
ByteString |
生成流 | Enumerator , WebSocket.Out , Chunks.Out , EventSource.Out |
源 |
将流转换为另一个流 | Enumeratee |
Flow |
将流转换为单个值 | Iteratee |
Accumulator |
使用流 | Iteratee |
Sink |
§如何迁移(按 API)
以下部分概述了如何迁移使用 API 不同部分的代码。
§迁移分块结果 (chunked
, Results.Chunked
)
在 Play 2.4 中,您将使用 Enumerator
在 Scala 中创建分块结果,使用 Results.Chunked
对象在 Java 中创建分块结果。在 Play 2.5 中,这些 API 部分仍然可用,但已弃用。
如果您选择迁移到新 API,您可以通过在 StatusHeader
对象上调用 chunked
方法并为块流提供 Akka Streams Source
对象来创建分块结果。
更高级的用户可能更喜欢显式创建 HttpEntity.Chunked
对象并将其传递到 Result
对象构造函数中。
- 要了解如何将 Enumerator 迁移到 Source,请参阅 将 Enumerator 迁移到 Source。
§迁移流式结果 (feed
, stream
)(仅限 Scala)
在 Play 2.4 中,Scala 用户可以通过将 Enumerator
传递给 feed
或 stream
方法来流式传输结果。(Java 用户没有流式传输结果的方法,除了分块结果。)feed
方法流式传输 Enumerator
的数据,然后关闭连接。stream
方法,要么流式传输或分块结果,并可能关闭连接,具体取决于连接的 HTTP 版本以及 Content-Length
标头的存在与否。
在 Play 2.5 中,stream
方法已被移除,feed
方法已被弃用。您可以选择是否将 feed
方法迁移到新的 API。如果您使用 stream
方法,则需要更改您的代码。
新的 API 是直接创建一个 Result
对象并选择一个 HttpEntity
来表示其主体。对于流式结果,您可以使用 HttpEntity.Streamed
类。Streamed
类以 Source
作为主体,并带有一个可选的 Content-Length
头部值。Source
的内容将被发送到客户端。如果实体具有 Content-Length
头部,则连接将保持打开状态,否则将关闭以指示流的结束。
- 要了解如何将 Enumerator 迁移到 Source,请参阅 将 Enumerator 迁移到 Source。
§迁移 WebSockets (WebSocket
)
在 Play 2.4 中,WebSocket 的双向流在 Java 中用一对 WebSocket.In
和 WebSocket.Out
对象表示,在 Scala 中用一对 Enumerator
和 Iteratee
对象表示。在 Play 2.5 中,Java 和 Scala 现在都使用 Akka Streams Flow
来表示双向流。
要迁移 Play 2.5 中的 WebSocket 代码,您有两个选择。
第一个选择是使用旧的 Play API,该 API 已被弃用并更名为 LegacyWebSocket
。这是最简单的选择。您只需要更改引用 WebSocket
的代码,改为引用 LegacyWebSocket
。LegacyWebSocket
类为您提供了从 Play 2.4 迁移到 Play 2.5 的简便路径。
第二个选择是更改为新的 Play API。为此,您需要更改 WebSocket 代码以使用 Akka Streams Flow
对象。
§迁移 Scala WebSockets
Play 2.4 Scala WebSocket API 需要一个 Enumerator
/Iteratee
对,该对生成 In
对象并使用 Out
对象。一对 FrameFormatter
负责将数据从 In
和 Out
对象中取出。
case class WebSocket[In, Out](f: RequestHeader => Future[Either[Result, (Enumerator[In], Iteratee[Out, Unit]) => Unit]])(implicit val inFormatter: WebSocket.FrameFormatter[In], val outFormatter: WebSocket.FrameFormatter[Out]) extends Handler {
trait FrameFormatter[A] {
def transform[B](fba: B => A, fab: A => B): FrameFormatter[B]
}
Play 2.5 Scala WebSocket API 是围绕 Message
的 Flow
构建的。一个 Message
表示一个 WebSocket 帧。MessageFlowTransformer
类型负责将高级对象(如 JSON、XML 和字节)转换为 Message
帧。提供了一组内置的隐式 MessageFlowTransformer
,您也可以编写自己的 MessageFlowTransformer
。
trait WebSocket extends Handler {
def apply(request: RequestHeader): Future[Either[Result, Flow[Message, Message, _]]]
}
sealed trait Message
case class TextMessage(data: String) extends Message
case class BinaryMessage(data: ByteString) extends Message
case class CloseMessage(statusCode: Option[Int] = Some(CloseCodes.Regular), reason: String = "") extends Message
case class PingMessage(data: ByteString) extends Message
case class PongMessage(data: ByteString) extends Message
trait MessageFlowTransformer[+In, -Out] { self =>
def transform(flow: Flow[In, Out, _]): Flow[Message, Message, _]
}
要迁移,您需要将双向 Enumerator
/Iteratee
流转换为 Flow
。您可能还需要使用 MessageFlowTransformer
将 In
/Out
对象转换为 Message
,尽管对于 JSON 等常见类型来说,这不是必需的,因为提供了一些内置的隐式转换。
- 要了解如何将 Enumerator 迁移到 Source,请参阅 将 Enumerator 迁移到 Source。
- 要了解如何将 Iteratee 迁移到 Sink,请参阅 将 Iteratees 迁移到 Sinks 和 Accumulators。
§迁移 Java WebSockets
Play 2.4 的 Java WebSocket API 使用 WebSocket.In
对象来处理传入消息,并使用 WebSocket.Out
对象来发送传出消息。该 API 支持传输文本、字节或 JSON 帧的 WebSockets。
return WebSocket.whenReady((in, out) -> {
out.write("Hello!");
out.close();
});
新的 Play 2.5 API 更加强大。现在您可以创建一个 WebSocket
并返回任意的 WebSocket Message
帧。双向 Message
流表示为 Flow
。
public abstract class WebSocket {
public abstract CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request);
}
如果您想将 WebSocket Message
帧转换为您自己的类型,您可以使用 MappedWebSocketAcceptor
类。为您提供了几个此类类:Text
、Binary
和 Json
。例如
return WebSocket.Text.accept(requestHeader -> {
// return a Flow<String, String, ?>
})
您还可以通过定义如何转换传入传出消息来创建自己的 MappedWebSocketAcceptor
。
§迁移 Comet
要在 Play 中使用 [Comet](https://en.wikipedia.org/wiki/Comet_(programming)),您需要生成一个带有特殊格式块的块状 HTTP 响应。Play 有一个 Comet
类来帮助生成可以在服务器上发送到浏览器的事件。在 Play 2.4.x 中,必须创建一个新的 Comet 实例并使用 Java 的回调,以及使用 Scala 的 Enumeratee。在 Play 2.5 中,基于 Akka Streams 添加了新的 API。
§迁移 Java Comet
为您的对象创建一个 Akka Streams 源,并将它们转换为 String
或 JsonNode
对象。从那里,您可以使用 play.libs.Comet.string
或 play.libs.Comet.json
将您的对象转换为适合 Results.ok().chunked()
的格式。在 JavaComet 中有更多文档。
由于 Java Comet 助手是基于回调的,因此将基于回调的类直接转换为 org.reactivestreams.Publisher
并使用 Source.fromPublisher
创建源可能更容易。
§迁移 Scala Comet
为您的对象创建一个 Akka Streams 源,并将它们转换为 String
或 JsValue
对象。从那里,您可以使用 play.api.libs.Comet.string
或 play.api.libs.Comet.json
将您的对象转换为适合 Ok.chunked()
的格式。在 ScalaComet 中有更多文档。
§迁移服务器发送事件 (EventSource
)
要在 Play 中使用 服务器发送事件,您需要生成一个带有特殊格式块的块状 HTTP 响应。Play 有一个 EventSource
接口来帮助生成可以在服务器上发送到浏览器的事件。在 Play 2.4 的 Java 和 Scala 中,每个都有完全不同的 API,但在 Play 2.5 中,它们已经更改,因此它们都基于 Akka Streams。
§迁移 Java 服务器发送事件
在 Play 2.4 的 Java API 中,您使用 EventSource
生成块流,EventSource
是一个扩展 Chunks<String>
的类。您可以从字符串或 JSON 对象构造 Event
对象,然后通过调用 EventSource
的 send
方法将它们发送到响应中。
EventSource eventSource = new EventSource() {
@Override
public void onConnected() {
send(Event.event("hello"));
send(Event.event("world"));
...
}
};
return ok(eventSource);
在 Play 2.5 中,您通常会为应用程序对象创建一个 Akka Streams Source
,使用 Source.map
将您的对象转换为 Event
,最后使用 EventSource.chunked
将 Event
转换为分块值。以下示例展示了如何将字符串流发送出去。
Source<String, ?> stringSource = ...;
Source<EventSource.Event, ?> eventSource = myStrings.map(Event::event);
return ok().chunked(EventSource.chunked(eventSource)).as("text/event-stream");
- 要将
EventSource.onConnected
、EventSource.send
等迁移到Source
,请在类上实现org.reactivestreams.Publisher
,并使用Source.fromPublisher
从回调创建源。
如果您仍然想使用与 Play 2.4 中相同的 API,可以使用 LegacyEventSource
类。此类与 Play 2.4 API 相同,但已重命名并弃用。如果您想使用新的 API,但保留与旧的命令式 API 相同的感觉,可以尝试 GraphStage
。
§迁移 Scala 服务器发送事件
要使用 Play 2.4 的 Scala API,您需要提供一个应用程序对象的 Enumerator
,然后使用 EventSource
Enumeratee
将它们转换为 Event
。最后,将 Event
传递给 chunked
方法,在那里它们将被转换为块。
val someDataStream: Enumerator[SomeData] = ???
Ok.chunked(someDataStream &> EventSource())
在 Play 2.5 中,使用 Enumerator
和 Enumeratee
的 EventSource
已被弃用。您仍然可以使用 Enumerator
和 Enumeratee
,但建议您将代码转换为使用 Source
和 Flow
。Source
生成对象流,EventSource.flow
的 Flow
将它们转换为 Event
。例如,上面的代码将被重写为
val someDataStream: Source[SomeData, Unit] = ???
Ok.chunked(someDataStream via EventSource.flow).as("text/event-stream")
- 要了解如何将 Enumerator 迁移到 Source,请参阅 将 Enumerator 迁移到 Source。
§迁移自定义操作 (EssentialAction
)(仅限 Scala)
大多数 Scala 用户将使用 Action
类来执行他们的操作。Action
类是一种 EssentialAction
,它始终在运行其逻辑并发送结果之前完全解析其主体。一些用户可能编写了自己的自定义 EssentialAction
,以便他们可以执行诸如增量处理请求主体之类的操作。
如果您在 Play 2.4 应用程序中只使用普通的 Action
,那么它们不需要任何迁移。但是,如果您编写了 EssentialAction
,那么您需要将其迁移到 Play 2.5 中的新 API。EssentialAction
的行为仍然相同,但签名已从 Play 2.4 更改
trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result])
Play 2.5 中的新签名
trait EssentialAction extends (RequestHeader => Accumulator[ByteString, Result])
要迁移,您需要将 Iteratee
替换为 Accumulator
,并将 Array[Byte]
替换为 ByteString
。
- 要了解如何将 Iteratee 迁移到 Accumulator,请参阅 将 Iteratee 迁移到 Sink 和 Accumulator。
- 要了解如何将
Array[Byte]
迁移到ByteString
,请参阅 将字节数组迁移到 ByteString。
§迁移自定义主体解析器 (BodyParser
)(仅限 Scala)
如果您是 Scala 用户,并且在 Play 2.4 应用程序中拥有自定义 BodyParser
,那么您需要将其迁移到新的 Play 2.5 API。在 Play 2.4 中,BodyParser
特征签名如下所示
trait BodyParser[+A] extends (RequestHeader => Iteratee[Array[Byte], Either[Result, A]])
在 Play 2.5 中,它已更改为使用 Akka Streams 类型
trait BodyParser[+A] extends (RequestHeader => Accumulator[ByteString, Either[Result, A]])
要迁移,您需要将 Iteratee
替换为 Accumulator
,并将 Array[Byte]
替换为 ByteString
。
- 要了解如何将 Iteratee 迁移到 Accumulator,请参阅 将 Iteratee 迁移到 Sink 和 Accumulator。
- 要了解如何将
Array[Byte]
迁移到ByteString
,请参阅 将字节数组迁移到 ByteString。
§迁移 Result
主体(仅限 Scala)
Result
对象已更改其表示结果主体和连接关闭标志的方式。它不再接受 body: Enumerator[Array[Byte]], connection: Connection
,而是接受 body: HttpEntity
。HttpEntity
类型包含有关主体的信息以及有关如何关闭连接的隐式信息。
您可以通过使用包含 Source
以及可选的 Content-Length
和 Content-Type
标头的 Streamed
实体来迁移现有的 Enumerator
。
val bodyPublisher: Publisher[ByteString] = Streams.enumeratorToPublisher(bodyEnumerator)
val bodySource: Source[ByteString, _] = Source.fromPublisher(bodyPublisher)
val entity: HttpEntity = HttpEntity.Streamed(bodySource)
new Result(headers, entity)
有关迁移到这些类型的更多信息,请参阅有关迁移 Enumerator
和迁移到 ByteString
的部分。
- 要了解如何将 Iteratee 迁移到 Accumulator,请参阅 将 Iteratee 迁移到 Sink 和 Accumulator。
- 要了解如何将
Array[Byte]
迁移到ByteString
,请参阅 将字节数组迁移到 ByteString。
您可能会发现根本不需要为 Result
主体使用流。如果是这种情况,您可能希望为主体使用 Strict
实体。
new Result(headers, HttpEntity.Strict(bytes))
§如何迁移(按类型)
本节说明如何将字节数组和流迁移到新的 Akka Streams API。
Akka Streams 是 Akka 项目的一部分。Play 使用 Akka Streams 提供流功能:发送和接收字节序列和其他对象。Akka 项目提供了大量有关 Akka Streams 的良好文档。在您开始在 Play 中使用 Akka Streams 之前,值得查看 Akka Streams 文档以了解有哪些信息可用。
API 文档可以在主 Akka API 文档的 akka.stream
包下找到
在您开始使用 Akka Streams 时,Akka 文档的“基础知识和使用流”部分值得一看。它将向您介绍 Akka Streams API 的最重要的部分。
您无需一次性转换整个应用程序。您的应用程序的一部分可以继续使用迭代器,而其他部分可以使用 Akka Streams。Akka Streams 提供了 反应式流 实现,Play 的迭代器库也提供了反应式流实现,因此,Play 的迭代器可以轻松地包装在 Akka Streams 中,反之亦然。
§将字节数组 (byte[]
/Array[Byte]
) 迁移到 ByteString
请参考 Java 和 Scala API 文档了解 ByteString
。
示例
Scala
// Get the empty ByteString (this instance is cached)
ByteString.empty
// Create a ByteString from a String
ByteString("hello")
ByteString.fromString("hello")
// Create a ByteString from an Array[Byte]
ByteString(arr)
ByteString.fromArray(arr)
Java
// Get the empty ByteString (this instance is cached)
ByteString.empty();
// Create a ByteString from a String
ByteString.fromString("hello");
// Create a ByteString from an Array[Byte]
ByteString.fromArray(arr);
§将 *.Out
迁移到 Source
Play 现在使用 Source
来生成事件,而不是旧的 WebSocket.Out
、Chunks.Out
和 EventSource.Out
类。这些类使用起来很简单,但它们缺乏灵活性,并且没有正确实现 反 压 力。
您可以用任何生成流的 Source
替换您的 *.Out
类。有很多方法可以创建 Source
(Java/Scala。
如果您想用一个简单的对象替换您的 *.Out
,您可以向该对象写入消息,然后关闭它,而无需担心反压力,那么您可以使用 Source.actorRef
方法。
Java
Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
.mapMaterializedValue(sourceActor -> {
sourceActor.tell(ByteString.fromString("hello"), null);
sourceActor.tell(ByteString.fromString("world"), null);
sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
return null;
});
Scala
val source = Source.actorRef[ByteString](256, OverflowStrategy.dropNew).mapMaterializedValue { sourceActor =>
sourceActor ! ByteString("hello")
sourceActor ! ByteString("world")
sourceActor ! Status.Success(()) // close the source
}
§将 Enumerator
迁移到 Source
Play 在许多地方使用 Enumerator
来生成值流。
步骤 1:使用过渡 API(如果可用)
如果您使用 Results.chunked
或 Results.feed
,您可以继续使用现有方法。这些方法已被弃用,因此您可能希望更改您的代码。
步骤 2:使用适配器将 Enumerator
转换为 Source
您可以通过首先使用 Streams.enumeratorToPublisher
将现有的 Enumerator
转换为一个反应式流 Publisher
来将其转换为 Source
,然后您可以使用 Source.fromPublisher
将发布者转换为源,例如
val enumerator: Enumerator[T] = ...
val source = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator))
步骤 3:(可选)重写为 Source
以下是一些常见的枚举器工厂方法映射列表
Iteratees | Akka Streams | 备注 |
---|---|---|
Enumerator.apply(a) |
Source.single(a) |
|
Enumerator.apply(a, b) |
Source.apply(List(a, b))) |
|
Enumerator.enumerate(seq) |
Source.apply(seq) |
seq 必须是不可变的 |
Enumerator.repeat |
Source.repeat |
在 Akka Streams 中,重复元素不会每次都进行评估 |
Enumerator.empty |
Source.empty |
|
Enumerator.unfold |
Source.unfold |
|
Enumerator.generateM |
Source.unfoldAsync |
|
Enumerator.fromStream |
StreamConverters.fromInputStream |
|
Enumerator.fromFile |
StreamConverters.fromInputStream |
您需要为java.io.File 创建一个InputStream |
§将Iteratee
迁移到Sink
和Accumulator
步骤 1:使用适配器转换
您可以通过首先使用Streams.iterateeToSubscriber
将现有的Iteratee
转换为反应式流Subscriber
,然后使用Sink.fromSubscriber
将订阅者转换为接收器,例如
val iteratee: Iteratee[T, U] = ...
val (subscriber, resultIteratee) = Streams.iterateeToSubscriber(iteratee)
val sink = Sink.fromSubscriber(subscriber)
如果您需要返回一个Accumulator
,您可以使用Streams.iterateeToAccumulator
方法。
步骤 2:(可选)重写为Sink
以下是一些常见的迭代器工厂方法映射
Iteratees | Akka Streams | 备注 |
---|---|---|
Iteratee.fold |
Sink.fold |
|
Iteratee.head |
Sink.headOption |
|
Iteratee.getChunks |
Sink.seq |
|
Iteratee.foreach |
Sink.foreach |
|
Iteratee.ignore |
Sink.ignore |
|
Done |
Sink.cancelled |
物化值可以被映射以产生结果,或者如果使用累加器,则可以使用Accumulator.done 。 |
§将Enumeratee
迁移到Processor
步骤 1:使用适配器转换
您可以通过首先使用Streams.enumerateeToProcessor
将现有的Enumeratee
转换为反应式流Processor
,然后使用Flow.fromProcessor
将处理器转换为流,例如
val enumeratee: Enumeratee[A, B] = ...
val flow = Flow.fromProcessor(() => Streams.enumerateeToProcessor(enumeratee))
步骤 2:(可选)重写为Flow
以下是一些常见的枚举器工厂方法映射
Iteratees | Akka Streams | 备注 |
---|---|---|
Enumeratee.map |
Flow.map |
|
Enumeratee.mapM |
Flow.mapAsync |
您必须在 Akka Streams 中指定并行度,即一次并行映射多少个元素。 |
Enumeratee.mapConcat |
Flow.mapConcat |
|
Enumeratee.filter |
Flow.filter |
|
Enumeratee.take |
Flow.take |
|
Enumeratee.takeWhile |
Flow.takeWhile |
|
Enumeratee.drop |
Flow.drop |
|
Enumeratee.dropWhile |
Flow.dropWhile |
|
Enumeratee.collect |
Flow.collect |
下一步: Java 迁移指南
发现此文档中的错误?此页面的源代码可以在此处找到。在阅读文档指南后,请随时贡献拉取请求。有疑问或建议要分享?前往我们的社区论坛与社区开始对话。