§与 Pekko 集成
Pekko 使用 Actor 模型来提高抽象级别,并提供更好的平台来构建正确的并发和可扩展应用程序。为了容错,它采用了“让它崩溃”模型,该模型在电信行业中已成功用于构建能够自我修复的应用程序 - 从不停止的系统。Actor 还提供了透明分布的抽象,以及构建真正可扩展和容错应用程序的基础。
§应用程序 Actor 系统
Pekko 可以与称为 Actor 系统的多个容器一起工作。Actor 系统管理其配置为使用的资源,以便运行其包含的 Actor。
Play 应用程序定义了一个特殊的 Actor 系统供应用程序使用。此 Actor 系统遵循应用程序生命周期,并在应用程序重启时自动重启。
§编写 Actor
要开始使用 Pekko,您需要编写一个 Actor。以下是一个简单的 Actor,它只是向任何要求它的人问好。
import org.apache.pekko.actor._
object HelloActor {
def props = Props[HelloActor]()
case class SayHello(name: String)
}
class HelloActor extends Actor {
import HelloActor._
def receive = {
case SayHello(name: String) =>
sender() ! "Hello, " + name
}
}
此 Actor 遵循一些 Pekko 约定
- 它发送/接收的消息,或其协议,在其伴随对象上定义
- 它还在其伴随对象上定义了一个
props
方法,该方法返回用于创建它的 props
§创建和使用 Actor
要创建和/或使用 Actor,您需要一个ActorSystem
。这可以通过声明对 ActorSystem 的依赖项来实现,如下所示
import javax.inject._
import actors.HelloActor
import org.apache.pekko.actor._
import play.api.mvc._
@Singleton
class Application @Inject() (system: ActorSystem, cc: ControllerComponents) extends AbstractController(cc) {
val helloActor = system.actorOf(HelloActor.props, "hello-actor")
// ...
}
actorOf
方法用于创建一个新的 Actor。请注意,我们已将此控制器声明为单例。这是必要的,因为我们正在创建 Actor 并存储对它的引用,如果控制器没有被作用域为单例,这意味着每次创建控制器时都会创建一个新的 Actor,这最终会导致异常,因为您不能在同一个系统中拥有两个具有相同名称的 Actor。
§向 Actor 询问事项
您可以对 Actor 做的最基本的事情是向它发送消息。当您向 Actor 发送消息时,不会有响应,它是“发送即忘”。这也称为告诉模式。
然而,在 Web 应用程序中,告诉模式通常没有用,因为 HTTP 是一种具有请求和响应的协议。在这种情况下,您更有可能想要使用询问模式。询问模式返回一个Future
,然后您可以将其映射到您自己的结果类型。
以下是使用我们的HelloActor
和ask模式的示例
import scala.concurrent.duration._
import org.apache.pekko.pattern.ask
implicit val timeout: Timeout = 5.seconds
def sayHello(name: String) = Action.async {
(helloActor ? SayHello(name)).mapTo[String].map { message => Ok(message) }
}
需要注意以下几点
- ask模式需要导入,然后它会在actor上提供一个
?
运算符。 - ask的返回值类型是
Future[Any]
,通常在向actor发送请求后,您需要做的第一件事是使用mapTo
方法将其映射到您期望的类型。 - 作用域中需要一个隐式超时 - ask模式必须有一个超时。如果actor响应时间超过此超时,则返回的future将以超时错误完成。
§依赖注入actor
如果您愿意,您可以让Guice实例化您的actor,并将actor引用绑定到它们,以便您的控制器和组件依赖它们。
例如,如果您想创建一个依赖于Play配置的actor,您可以这样做
import javax.inject._
import org.apache.pekko.actor._
import play.api.Configuration
object ConfiguredActor {
case object GetConfig
}
class ConfiguredActor @Inject() (configuration: Configuration) extends Actor {
import ConfiguredActor._
val config = configuration.getOptional[String]("my.config").getOrElse("none")
def receive = {
case GetConfig =>
sender() ! config
}
}
Play提供了一些帮助程序来帮助提供actor绑定。这些允许actor本身进行依赖注入,并允许将actor的actor引用注入到其他组件中。要使用这些帮助程序绑定actor,请创建一个模块,如依赖注入文档中所述,然后混合PekkoGuiceSupport
特征,并使用bindActor
方法绑定actor
import actors.ConfiguredActor
import com.google.inject.AbstractModule
import play.api.libs.concurrent.PekkoGuiceSupport
class MyModule extends AbstractModule with PekkoGuiceSupport {
override def configure = {
bindActor[ConfiguredActor]("configured-actor")
}
}
此actor将被命名为configured-actor
,并且还将使用configured-actor
名称进行注入限定。您现在可以在控制器和其他组件中依赖此actor
import javax.inject._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import actors.ConfiguredActor._
import org.apache.pekko.actor._
import org.apache.pekko.pattern.ask
import org.apache.pekko.util.Timeout
import play.api.mvc._
@Singleton
class Application @Inject() (
@Named("configured-actor") configuredActor: ActorRef,
components: ControllerComponents
)(
implicit ec: ExecutionContext
) extends AbstractController(components) {
implicit val timeout: Timeout = 5.seconds
def getConfig = Action.async {
(configuredActor ? GetConfig).mapTo[String].map { message => Ok(message) }
}
}
§依赖注入子actor
以上方法适用于注入根actor,但您创建的许多actor将是子actor,它们不绑定到Play应用程序的生命周期,并且可能具有传递给它们的附加状态。
为了帮助依赖注入子actor,Play利用了Guice的AssistedInject支持。
假设您有以下actor,它依赖于要注入的配置,以及一个键
import javax.inject._
import com.google.inject.assistedinject.Assisted
import org.apache.pekko.actor._
import play.api.Configuration
object ConfiguredChildActor {
case object GetConfig
trait Factory {
def apply(key: String): Actor
}
}
class ConfiguredChildActor @Inject() (configuration: Configuration, @Assisted key: String) extends Actor {
import ConfiguredChildActor._
val config = configuration.getOptional[String](key).getOrElse("none")
def receive = {
case GetConfig =>
sender() ! config
}
}
请注意,key
参数被声明为@Assisted
,这表明它将被手动提供。
我们还定义了一个Factory
特征,它接受key
,并返回一个Actor
。我们不会实现它,Guice 会为我们做到这一点,提供一个不仅传递我们的key
参数,而且还定位Configuration
依赖项并注入它的实现。由于该特征只返回一个Actor
,因此在测试此 Actor 时,我们可以注入一个返回任何 Actor 的工厂,例如,这允许我们注入一个模拟的子 Actor,而不是实际的子 Actor。
现在,依赖于此的 Actor 可以扩展InjectedActorSupport
,并且它可以依赖于我们创建的工厂
import javax.inject._
import org.apache.pekko.actor._
import play.api.libs.concurrent.InjectedActorSupport
object ParentActor {
case class GetChild(key: String)
}
class ParentActor @Inject() (
childFactory: ConfiguredChildActor.Factory
) extends Actor
with InjectedActorSupport {
import ParentActor._
def receive = {
case GetChild(key: String) =>
val child: ActorRef = injectedChild(childFactory(key), key)
sender() ! child
}
}
它使用injectedChild
来创建并获取对子 Actor 的引用,传入 key。第二个参数(本例中的key
)将用作子 Actor 的名称。
最后,我们需要绑定我们的 Actor。在我们的模块中,我们使用bindActorFactory
方法来绑定父 Actor,并将子工厂绑定到子实现
import actors._
import com.google.inject.AbstractModule
import play.api.libs.concurrent.PekkoGuiceSupport
class MyModule extends AbstractModule with PekkoGuiceSupport {
override def configure = {
bindActor[ParentActor]("parent-actor")
bindActorFactory[ConfiguredChildActor, ConfiguredChildActor.Factory]
}
}
这将使 Guice 自动绑定一个ConfiguredChildActor.Factory
的实例,当ConfiguredChildActor
实例化时,它将为ConfiguredChildActor
提供一个Configuration
实例。
§配置
默认的 Actor 系统配置是从 Play 应用程序配置文件中读取的。例如,要配置应用程序 Actor 系统的默认调度器,请将以下行添加到conf/application.conf
文件中
pekko.actor.default-dispatcher.fork-join-executor.parallelism-max = 64
pekko.actor.debug.receive = on
有关 Pekko 日志记录配置,请参阅配置日志记录。
§内置 Actor 系统名称
默认情况下,Play Actor 系统的名称为application
。您可以通过conf/application.conf
中的条目更改此名称
play.pekko.actor-system = "custom-name"
注意:如果您想将 Play 应用程序 ActorSystem 放入 Pekko 集群中,此功能很有用。
§使用您自己的 Actor 系统
虽然我们建议您使用内置的 Actor 系统,因为它设置了所有内容,例如正确的类加载器、生命周期钩子等,但没有任何东西可以阻止您使用自己的 Actor 系统。但是,务必确保您执行以下操作
- 注册一个停止钩子,以便在 Play 关闭时关闭 Actor 系统
- 从 Play 环境 中传入正确的类加载器,否则 Pekko 将无法找到您的应用程序类
- 确保您不要从默认的
pekko
配置中读取 Pekko 配置,该配置已由 Play 的 Actor 系统使用,因为这会导致问题,例如当系统尝试绑定到相同的远程端口时。
§Pekko 协调关闭
Play 使用 Pekko 的 协调关闭 处理 Application
和 Server
的关闭。更多信息请参见 协调关闭 通用部分。
注意:Play 仅处理其内部 ActorSystem
的关闭。如果您使用额外的 Actor 系统,请确保它们都已终止,并随时将您的终止代码迁移到 协调关闭。
§Pekko 集群
您可以使您的 Play 应用程序加入现有的 Pekko 集群。在这种情况下,建议您优雅地离开集群。Play 附带 Pekko 的协调关闭,它将负责优雅地离开。
如果您已经拥有自定义的集群离开代码,建议您将其替换为 Pekko 的处理方式。有关更多详细信息,请参见 Pekko 文档。
§更新 Pekko 版本
如果您想使用 Play 尚未使用的较新版本的 Pekko,您可以在您的 build.sbt
文件中添加以下内容
// The newer Pekko version you want to use.
val pekkoVersion = "1.0.0"
// Pekko dependencies used by Play
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoVersion,
"org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion,
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion,
"org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion,
// Only if you are using Pekko Testkit
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion
)
当然,其他 Pekko 工件可以被传递地添加。使用 sbt-dependency-graph 更好地检查您的构建并检查您需要显式添加哪些工件。
如果您尚未切换到 Netty 服务器后端,因此正在使用 Play 的默认 Pekko HTTP 服务器后端,您还需要更新 Pekko HTTP。因此,您还需要显式添加其依赖项
// The newer Pekko HTTP version you want to use.
val pekkoHTTPVersion = "1.0.0"
// Pekko HTTP dependency used by Play
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http-core" % pekkoHTTPVersion
)
注意:在进行此类更新时,请记住您需要遵循 Pekko 的 二进制兼容性规则。如果您手动添加其他 Pekko 工件,请记住保持所有 Pekko 工件的版本一致,因为 不允许混合版本。
注意:在解析依赖项时,sbt 将获取为此项目声明或传递添加的最新版本。这意味着如果 Play 依赖于比您声明的版本更新的 Pekko(或 Pekko HTTP)版本,则 Play 的版本将获胜。有关 sbt 如何在此处进行排除 的更多详细信息,请参见此处。
下一步:使用消息进行国际化
在此文档中发现错误?此页面的源代码可以在 此处 找到。在阅读 文档指南 后,请随时贡献拉取请求。有疑问或建议要分享?转到 我们的社区论坛,与社区开始对话。