文档

§Streams 迁移指南

Play 2.5 对其流式传输数据和响应主体的方式进行了重大更改。

  1. Play 2.5 使用 **Akka Streams 进行流式传输**。以前的 Play 版本使用迭代器进行流式传输,以及其他几种临时类型的流式传输,例如 WebSocketChunks 等。

    使用 Akka Streams 的更改有两个主要好处。首先,Java 用户现在可以访问 Play 的全部功能集,例如编写主体解析器和过滤器。其次,流式传输库现在在 Play 中更加一致。

  2. Play 2.5 使用 **ByteString 来保存字节包**。以前,Play 使用字节数组 (byte[]/ArrayByte) 来保存字节。

    ByteString 类是不可变的,就像 Java 的 String 一样,因此它更安全、更易于使用。与 String 一样,它在构造时确实会复制其数据,因此会带来少量性能成本,但这可以通过其廉价的串联和子字符串操作来平衡。

  3. Play 2.5 具有用于响应主体的新 **HttpEntity 类型**。以前,响应主体是字节的简单流。HTTP 主体现在是 HttpEntity 的一种类型:StrictStreamedChunked

    通过告诉 Play 使用哪种类型的实体,应用程序可以更好地控制 Play 如何发送 HTTP 响应。它还使 Play 更容易优化其传递主体的过程。

§更改摘要

Play API 的以下部分已更新

以下类型已更改

目的 旧类型 新类型
保存字节 byte[]/Array[Byte] ByteString
生成流 Enumerator, WebSocket.Out, Chunks.Out, EventSource.Out
将流转换为另一个流 Enumeratee Flow
将流转换为单个值 Iteratee Accumulator
使用流 Iteratee Sink

§如何迁移(按 API)

以下部分概述了如何迁移使用 API 不同部分的代码。

§迁移分块结果 (chunked, Results.Chunked)

在 Play 2.4 中,您将使用 Enumerator 在 Scala 中创建分块结果,使用 Results.Chunked 对象在 Java 中创建分块结果。在 Play 2.5 中,这些 API 部分仍然可用,但已弃用。

如果您选择迁移到新 API,您可以通过在 StatusHeader 对象上调用 chunked 方法并为块流提供 Akka Streams Source 对象来创建分块结果。

更高级的用户可能更喜欢显式创建 HttpEntity.Chunked 对象并将其传递到 Result 对象构造函数中。

§迁移流式结果 (feed, stream)(仅限 Scala)

在 Play 2.4 中,Scala 用户可以通过将 Enumerator 传递给 feedstream 方法来流式传输结果。(Java 用户没有流式传输结果的方法,除了分块结果。)feed 方法流式传输 Enumerator 的数据,然后关闭连接。stream 方法,要么流式传输或分块结果,并可能关闭连接,具体取决于连接的 HTTP 版本以及 Content-Length 标头的存在与否。

在 Play 2.5 中,stream 方法已被移除,feed 方法已被弃用。您可以选择是否将 feed 方法迁移到新的 API。如果您使用 stream 方法,则需要更改您的代码。

新的 API 是直接创建一个 Result 对象并选择一个 HttpEntity 来表示其主体。对于流式结果,您可以使用 HttpEntity.Streamed 类。Streamed 类以 Source 作为主体,并带有一个可选的 Content-Length 头部值。Source 的内容将被发送到客户端。如果实体具有 Content-Length 头部,则连接将保持打开状态,否则将关闭以指示流的结束。

§迁移 WebSockets (WebSocket)

在 Play 2.4 中,WebSocket 的双向流在 Java 中用一对 WebSocket.InWebSocket.Out 对象表示,在 Scala 中用一对 EnumeratorIteratee 对象表示。在 Play 2.5 中,Java 和 Scala 现在都使用 Akka Streams Flow 来表示双向流。

要迁移 Play 2.5 中的 WebSocket 代码,您有两个选择。

第一个选择是使用旧的 Play API,该 API 已被弃用并更名为 LegacyWebSocket。这是最简单的选择。您只需要更改引用 WebSocket 的代码,改为引用 LegacyWebSocketLegacyWebSocket 类为您提供了从 Play 2.4 迁移到 Play 2.5 的简便路径。

第二个选择是更改为新的 Play API。为此,您需要更改 WebSocket 代码以使用 Akka Streams Flow 对象。

§迁移 Scala WebSockets

Play 2.4 Scala WebSocket API 需要一个 Enumerator/Iteratee 对,该对生成 In 对象并使用 Out 对象。一对 FrameFormatter 负责将数据从 InOut 对象中取出。

case class WebSocket[In, Out](f: RequestHeader => Future[Either[Result, (Enumerator[In], Iteratee[Out, Unit]) => Unit]])(implicit val inFormatter: WebSocket.FrameFormatter[In], val outFormatter: WebSocket.FrameFormatter[Out]) extends Handler {
trait FrameFormatter[A] {
  def transform[B](fba: B => A, fab: A => B): FrameFormatter[B]
}

Play 2.5 Scala WebSocket API 是围绕 MessageFlow 构建的。一个 Message 表示一个 WebSocket 帧MessageFlowTransformer 类型负责将高级对象(如 JSON、XML 和字节)转换为 Message 帧。提供了一组内置的隐式 MessageFlowTransformer,您也可以编写自己的 MessageFlowTransformer

trait WebSocket extends Handler {
  def apply(request: RequestHeader): Future[Either[Result, Flow[Message, Message, _]]]
}
sealed trait Message
case class TextMessage(data: String) extends Message
case class BinaryMessage(data: ByteString) extends Message
case class CloseMessage(statusCode: Option[Int] = Some(CloseCodes.Regular), reason: String = "") extends Message
case class PingMessage(data: ByteString) extends Message
case class PongMessage(data: ByteString) extends Message
trait MessageFlowTransformer[+In, -Out] { self =>
  def transform(flow: Flow[In, Out, _]): Flow[Message, Message, _]
}

要迁移,您需要将双向 Enumerator/Iteratee 流转换为 Flow。您可能还需要使用 MessageFlowTransformerIn/Out 对象转换为 Message,尽管对于 JSON 等常见类型来说,这不是必需的,因为提供了一些内置的隐式转换。

§迁移 Java WebSockets

Play 2.4 的 Java WebSocket API 使用 WebSocket.In 对象来处理传入消息,并使用 WebSocket.Out 对象来发送传出消息。该 API 支持传输文本、字节或 JSON 帧的 WebSockets。

return WebSocket.whenReady((in, out) -> {
    out.write("Hello!");
    out.close();
});

新的 Play 2.5 API 更加强大。现在您可以创建一个 WebSocket 并返回任意的 WebSocket Message 帧。双向 Message 流表示为 Flow

public abstract class WebSocket {
    public abstract CompletionStage<F.Either<Result, Flow<Message, Message, ?>>> apply(Http.RequestHeader request);
}

如果您想将 WebSocket Message 帧转换为您自己的类型,您可以使用 MappedWebSocketAcceptor 类。为您提供了几个此类类:TextBinaryJson。例如

return WebSocket.Text.accept(requestHeader -> {
  // return a Flow<String, String, ?>
})

您还可以通过定义如何转换传入传出消息来创建自己的 MappedWebSocketAcceptor

§迁移 Comet

要在 Play 中使用 [Comet](https://en.wikipedia.org/wiki/Comet_(programming)),您需要生成一个带有特殊格式块的块状 HTTP 响应。Play 有一个 Comet 类来帮助生成可以在服务器上发送到浏览器的事件。在 Play 2.4.x 中,必须创建一个新的 Comet 实例并使用 Java 的回调,以及使用 Scala 的 Enumeratee。在 Play 2.5 中,基于 Akka Streams 添加了新的 API。

§迁移 Java Comet

为您的对象创建一个 Akka Streams 源,并将它们转换为 StringJsonNode 对象。从那里,您可以使用 play.libs.Comet.stringplay.libs.Comet.json 将您的对象转换为适合 Results.ok().chunked() 的格式。在 JavaComet 中有更多文档。

由于 Java Comet 助手是基于回调的,因此将基于回调的类直接转换为 org.reactivestreams.Publisher 并使用 Source.fromPublisher 创建源可能更容易。

§迁移 Scala Comet

为您的对象创建一个 Akka Streams 源,并将它们转换为 StringJsValue 对象。从那里,您可以使用 play.api.libs.Comet.stringplay.api.libs.Comet.json 将您的对象转换为适合 Ok.chunked() 的格式。在 ScalaComet 中有更多文档。

§迁移服务器发送事件 (EventSource)

要在 Play 中使用 服务器发送事件,您需要生成一个带有特殊格式块的块状 HTTP 响应。Play 有一个 EventSource 接口来帮助生成可以在服务器上发送到浏览器的事件。在 Play 2.4 的 Java 和 Scala 中,每个都有完全不同的 API,但在 Play 2.5 中,它们已经更改,因此它们都基于 Akka Streams。

§迁移 Java 服务器发送事件

在 Play 2.4 的 Java API 中,您使用 EventSource 生成块流,EventSource 是一个扩展 Chunks<String> 的类。您可以从字符串或 JSON 对象构造 Event 对象,然后通过调用 EventSourcesend 方法将它们发送到响应中。

EventSource eventSource = new EventSource() {
    @Override
    public void onConnected() {
        send(Event.event("hello"));
        send(Event.event("world"));
        ...
    }
};
return ok(eventSource);

在 Play 2.5 中,您通常会为应用程序对象创建一个 Akka Streams Source,使用 Source.map 将您的对象转换为 Event,最后使用 EventSource.chunkedEvent 转换为分块值。以下示例展示了如何将字符串流发送出去。

Source<String, ?> stringSource = ...;
Source<EventSource.Event, ?> eventSource = myStrings.map(Event::event);
return ok().chunked(EventSource.chunked(eventSource)).as("text/event-stream");

如果您仍然想使用与 Play 2.4 中相同的 API,可以使用 LegacyEventSource 类。此类与 Play 2.4 API 相同,但已重命名并弃用。如果您想使用新的 API,但保留与旧的命令式 API 相同的感觉,可以尝试 GraphStage

§迁移 Scala 服务器发送事件

要使用 Play 2.4 的 Scala API,您需要提供一个应用程序对象的 Enumerator,然后使用 EventSource Enumeratee 将它们转换为 Event。最后,将 Event 传递给 chunked 方法,在那里它们将被转换为块。

val someDataStream: Enumerator[SomeData] = ???
Ok.chunked(someDataStream &> EventSource())

在 Play 2.5 中,使用 EnumeratorEnumerateeEventSource 已被弃用。您仍然可以使用 EnumeratorEnumeratee,但建议您将代码转换为使用 SourceFlowSource 生成对象流,EventSource.flowFlow 将它们转换为 Event。例如,上面的代码将被重写为

val someDataStream: Source[SomeData, Unit] = ???
Ok.chunked(someDataStream via EventSource.flow).as("text/event-stream")

§迁移自定义操作 (EssentialAction)(仅限 Scala)

大多数 Scala 用户将使用 Action 类来执行他们的操作。Action 类是一种 EssentialAction,它始终在运行其逻辑并发送结果之前完全解析其主体。一些用户可能编写了自己的自定义 EssentialAction,以便他们可以执行诸如增量处理请求主体之类的操作。

如果您在 Play 2.4 应用程序中只使用普通的 Action,那么它们不需要任何迁移。但是,如果您编写了 EssentialAction,那么您需要将其迁移到 Play 2.5 中的新 API。EssentialAction 的行为仍然相同,但签名已从 Play 2.4 更改

trait EssentialAction extends (RequestHeader => Iteratee[Array[Byte], Result])

Play 2.5 中的新签名

trait EssentialAction extends (RequestHeader => Accumulator[ByteString, Result])

要迁移,您需要将 Iteratee 替换为 Accumulator,并将 Array[Byte] 替换为 ByteString

§迁移自定义主体解析器 (BodyParser)(仅限 Scala)

如果您是 Scala 用户,并且在 Play 2.4 应用程序中拥有自定义 BodyParser,那么您需要将其迁移到新的 Play 2.5 API。在 Play 2.4 中,BodyParser 特征签名如下所示

trait BodyParser[+A] extends (RequestHeader => Iteratee[Array[Byte], Either[Result, A]])

在 Play 2.5 中,它已更改为使用 Akka Streams 类型

trait BodyParser[+A] extends (RequestHeader => Accumulator[ByteString, Either[Result, A]])

要迁移,您需要将 Iteratee 替换为 Accumulator,并将 Array[Byte] 替换为 ByteString

§迁移 Result 主体(仅限 Scala)

Result 对象已更改其表示结果主体和连接关闭标志的方式。它不再接受 body: Enumerator[Array[Byte]], connection: Connection,而是接受 body: HttpEntityHttpEntity 类型包含有关主体的信息以及有关如何关闭连接的隐式信息。

您可以通过使用包含 Source 以及可选的 Content-LengthContent-Type 标头的 Streamed 实体来迁移现有的 Enumerator

val bodyPublisher: Publisher[ByteString] = Streams.enumeratorToPublisher(bodyEnumerator)
val bodySource: Source[ByteString, _] = Source.fromPublisher(bodyPublisher)
val entity: HttpEntity = HttpEntity.Streamed(bodySource)
new Result(headers, entity)

有关迁移到这些类型的更多信息,请参阅有关迁移 Enumerator 和迁移到 ByteString 的部分。

您可能会发现根本不需要为 Result 主体使用流。如果是这种情况,您可能希望为主体使用 Strict 实体。

new Result(headers, HttpEntity.Strict(bytes))

§如何迁移(按类型)

本节说明如何将字节数组和流迁移到新的 Akka Streams API。

Akka Streams 是 Akka 项目的一部分。Play 使用 Akka Streams 提供流功能:发送和接收字节序列和其他对象。Akka 项目提供了大量有关 Akka Streams 的良好文档。在您开始在 Play 中使用 Akka Streams 之前,值得查看 Akka Streams 文档以了解有哪些信息可用。

API 文档可以在主 Akka API 文档的 akka.stream 包下找到

在您开始使用 Akka Streams 时,Akka 文档的“基础知识和使用流”部分值得一看。它将向您介绍 Akka Streams API 的最重要的部分。

您无需一次性转换整个应用程序。您的应用程序的一部分可以继续使用迭代器,而其他部分可以使用 Akka Streams。Akka Streams 提供了 反应式流 实现,Play 的迭代器库也提供了反应式流实现,因此,Play 的迭代器可以轻松地包装在 Akka Streams 中,反之亦然。

§将字节数组 (byte[]/Array[Byte]) 迁移到 ByteString

请参考 JavaScala API 文档了解 ByteString

示例

Scala

// Get the empty ByteString (this instance is cached)
ByteString.empty
// Create a ByteString from a String
ByteString("hello")
ByteString.fromString("hello")
// Create a ByteString from an Array[Byte]
ByteString(arr)
ByteString.fromArray(arr)

Java

// Get the empty ByteString (this instance is cached)
ByteString.empty();
// Create a ByteString from a String
ByteString.fromString("hello");
// Create a ByteString from an Array[Byte]
ByteString.fromArray(arr);

§*.Out 迁移到 Source

Play 现在使用 Source 来生成事件,而不是旧的 WebSocket.OutChunks.OutEventSource.Out 类。这些类使用起来很简单,但它们缺乏灵活性,并且没有正确实现 力。

您可以用任何生成流的 Source 替换您的 *.Out 类。有很多方法可以创建 Source (Java/Scala

如果您想用一个简单的对象替换您的 *.Out,您可以向该对象写入消息,然后关闭它,而无需担心反压力,那么您可以使用 Source.actorRef 方法。

Java

Source<ByteString, ?> source = Source.<ByteString>actorRef(256, OverflowStrategy.dropNew())
  .mapMaterializedValue(sourceActor -> {
    sourceActor.tell(ByteString.fromString("hello"), null);
    sourceActor.tell(ByteString.fromString("world"), null);
    sourceActor.tell(new Status.Success(NotUsed.getInstance()), null);
    return null;
  });

Scala

val source = Source.actorRef[ByteString](256, OverflowStrategy.dropNew).mapMaterializedValue { sourceActor =>
  sourceActor ! ByteString("hello")
  sourceActor ! ByteString("world")
  sourceActor ! Status.Success(()) // close the source
}

§Enumerator 迁移到 Source

Play 在许多地方使用 Enumerator 来生成值流。

步骤 1:使用过渡 API(如果可用)

如果您使用 Results.chunkedResults.feed,您可以继续使用现有方法。这些方法已被弃用,因此您可能希望更改您的代码。

步骤 2:使用适配器将 Enumerator 转换为 Source

您可以通过首先使用 Streams.enumeratorToPublisher 将现有的 Enumerator 转换为一个反应式流 Publisher 来将其转换为 Source,然后您可以使用 Source.fromPublisher 将发布者转换为源,例如

val enumerator: Enumerator[T] = ...
val source = Source.fromPublisher(Streams.enumeratorToPublisher(enumerator))

步骤 3:(可选)重写为 Source

以下是一些常见的枚举器工厂方法映射列表

Iteratees Akka Streams 备注
Enumerator.apply(a) Source.single(a)
Enumerator.apply(a, b) Source.apply(List(a, b)))
Enumerator.enumerate(seq) Source.apply(seq) seq 必须是不可变的
Enumerator.repeat Source.repeat 在 Akka Streams 中,重复元素不会每次都进行评估
Enumerator.empty Source.empty
Enumerator.unfold Source.unfold
Enumerator.generateM Source.unfoldAsync
Enumerator.fromStream StreamConverters.fromInputStream
Enumerator.fromFile StreamConverters.fromInputStream 您需要为java.io.File创建一个InputStream

§Iteratee迁移到SinkAccumulator

步骤 1:使用适配器转换

您可以通过首先使用Streams.iterateeToSubscriber将现有的Iteratee转换为反应式流Subscriber,然后使用Sink.fromSubscriber将订阅者转换为接收器,例如

val iteratee: Iteratee[T, U] = ...
val (subscriber, resultIteratee) = Streams.iterateeToSubscriber(iteratee)
val sink = Sink.fromSubscriber(subscriber)

如果您需要返回一个Accumulator,您可以使用Streams.iterateeToAccumulator方法。

步骤 2:(可选)重写为Sink

以下是一些常见的迭代器工厂方法映射

Iteratees Akka Streams 备注
Iteratee.fold Sink.fold
Iteratee.head Sink.headOption
Iteratee.getChunks Sink.seq
Iteratee.foreach Sink.foreach
Iteratee.ignore Sink.ignore
Done Sink.cancelled 物化值可以被映射以产生结果,或者如果使用累加器,则可以使用Accumulator.done

§Enumeratee迁移到Processor

步骤 1:使用适配器转换

您可以通过首先使用Streams.enumerateeToProcessor将现有的Enumeratee转换为反应式流Processor,然后使用Flow.fromProcessor将处理器转换为流,例如

val enumeratee: Enumeratee[A, B] = ...
val flow = Flow.fromProcessor(() => Streams.enumerateeToProcessor(enumeratee))

步骤 2:(可选)重写为Flow

以下是一些常见的枚举器工厂方法映射

Iteratees Akka Streams 备注
Enumeratee.map Flow.map
Enumeratee.mapM Flow.mapAsync 您必须在 Akka Streams 中指定并行度,即一次并行映射多少个元素。
Enumeratee.mapConcat Flow.mapConcat
Enumeratee.filter Flow.filter
Enumeratee.take Flow.take
Enumeratee.takeWhile Flow.takeWhile
Enumeratee.drop Flow.drop
Enumeratee.dropWhile Flow.dropWhile
Enumeratee.collect Flow.collect

下一步: Java 迁移指南


发现此文档中的错误?此页面的源代码可以在此处找到。在阅读文档指南后,请随时贡献拉取请求。有疑问或建议要分享?前往我们的社区论坛与社区开始对话。