§与 Pekko 集成
Pekko 使用 Actor 模型来提高抽象级别,并提供更好的平台来构建正确的并发和可扩展应用程序。为了实现容错,它采用了“让它崩溃”模型,该模型在电信行业中取得了巨大成功,用于构建能够自我修复的应用程序——永不停止的系统。Actor 还提供了透明分布的抽象,以及构建真正可扩展和容错应用程序的基础。
§应用程序 Actor 系统
Pekko 可以与称为 Actor 系统的多个容器一起使用。Actor 系统管理其配置为使用的资源,以运行其包含的 Actor。
Play 应用程序定义了一个特殊的 Actor 系统供应用程序使用。此 Actor 系统遵循应用程序生命周期,并在应用程序重启时自动重启。
§编写 Actor
要开始使用 Pekko,您需要编写一个 Actor。下面是一个简单的 Actor,它只是向任何向它问好的人问好。
package actors;
import org.apache.pekko.actor.*;
import org.apache.pekko.japi.*;
import actors.HelloActorProtocol.*;
public class HelloActor extends AbstractActor {
public static Props getProps() {
return Props.create(HelloActor.class);
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
SayHello.class,
hello -> {
String reply = "Hello, " + hello.name;
sender().tell(reply, self());
})
.build();
}
}
请注意,HelloActor
定义了一个名为 getProps
的静态方法,此方法返回一个 Props
对象,该对象描述了如何创建 Actor。这是一个很好的 Pekko 约定,将实例化逻辑与创建 Actor 的代码分离。
这里展示的另一个最佳实践是,HelloActor
发送和接收的消息被定义为另一个名为 HelloActorProtocol
的类的静态内部类。
package actors;
public class HelloActorProtocol {
public static class SayHello {
public final String name;
public SayHello(String name) {
this.name = name;
}
}
}
§创建和使用 Actor
要创建和/或使用 Actor,您需要一个 ActorSystem
。这可以通过声明对 ActorSystem
的依赖来实现,然后您可以使用 actorOf
方法来创建一个新的 Actor。
您可以对 Actor 执行的最基本操作是向其发送消息。当您向 Actor 发送消息时,不会收到任何响应,它是“发送即忘”。这也被称为 tell 模式。
然而,在 Web 应用程序中,tell 模式通常没有用,因为 HTTP 是一种具有请求和响应的协议。在这种情况下,您更有可能想要使用 ask 模式。ask 模式返回一个 Scala Future
,您可以使用 scala.compat.java8.FutureConverts.toJava
将其转换为 Java CompletionStage
,然后映射到您自己的结果类型。
以下是用 ask 模式使用 HelloActor
的示例
import org.apache.pekko.actor.*;
import play.mvc.*;
import scala.jdk.javaapi.FutureConverters;
import javax.inject.*;
import java.util.concurrent.CompletionStage;
import static org.apache.pekko.pattern.Patterns.ask;
@Singleton
public class Application extends Controller {
final ActorRef helloActor;
@Inject
public Application(ActorSystem system) {
helloActor = system.actorOf(HelloActor.getProps());
}
public CompletionStage<Result> sayHello(String name) {
return FutureConverters.asJava(ask(helloActor, new SayHello(name), 1000))
.thenApply(response -> ok((String) response));
}
}
需要注意的几点
- ask 模式需要导入,通常最方便的是静态导入
ask
方法。 - 返回的 Future 被转换为
CompletionStage
。生成的 Promise 是一个CompletionStage<Object>
,因此当您访问其值时,需要将其强制转换为您期望从 Actor 返回的类型。 - ask 模式需要一个超时时间,我们提供了 1000 毫秒。如果 Actor 响应时间超过此时间,则返回的 Promise 将以超时错误完成。
- 由于我们在构造函数中创建了 Actor,因此我们需要将控制器范围设置为
Singleton
,这样每次使用此控制器时都不会创建新的 Actor。
§依赖注入 Actor
如果您愿意,您可以让 Guice 实例化您的 Actor 并将 Actor 引用绑定到它们,以便您的控制器和组件可以依赖它们。
例如,如果您想创建一个依赖于 Play 配置的 Actor,您可以这样做
import com.typesafe.config.Config;
import javax.inject.Inject;
import org.apache.pekko.actor.AbstractActor;
public class ConfiguredActor extends AbstractActor {
private Config configuration;
@Inject
public ConfiguredActor(Config configuration) {
this.configuration = configuration;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
ConfiguredActorProtocol.GetConfig.class,
message -> {
sender().tell(configuration.getString("my.config"), self());
})
.build();
}
}
Play 提供了一些帮助程序来帮助提供 Actor 绑定。这些允许 Actor 本身进行依赖注入,并允许将 Actor 引用注入到其他组件中。要使用这些帮助程序绑定 Actor,请创建一个模块,如 依赖注入文档 中所述,然后混合 PekkoGuiceSupport
接口,并使用 bindActor
方法绑定 Actor
import com.google.inject.AbstractModule;
import play.libs.pekko.PekkoGuiceSupport;
public class MyModule extends AbstractModule implements PekkoGuiceSupport {
@Override
protected void configure() {
bindActor(ConfiguredActor.class, "configured-actor");
}
}
此 Actor 将被命名为 configured-actor
,并且还将使用 configured-actor
名称进行注入限定。现在,您可以在控制器和其他组件中依赖此 Actor
import org.apache.pekko.actor.ActorRef;
import play.mvc.*;
import scala.jdk.javaapi.FutureConverters;
import javax.inject.Inject;
import javax.inject.Named;
import java.util.concurrent.CompletionStage;
import static org.apache.pekko.pattern.Patterns.ask;
public class Application extends Controller {
private ActorRef configuredActor;
@Inject
public Application(@Named("configured-actor") ActorRef configuredActor) {
this.configuredActor = configuredActor;
}
public CompletionStage<Result> getConfig() {
return FutureConverters.asJava(
ask(configuredActor, new ConfiguredActorProtocol.GetConfig(), 1000))
.thenApply(response -> ok((String) response));
}
}
§依赖注入子 Actor
以上方法适用于注入根 Actor,但你创建的许多 Actor 都是子 Actor,它们不绑定到 Play 应用程序的生命周期,并且可能传递额外的状态。
为了帮助依赖注入子 Actor,Play 利用了 Guice 的 AssistedInject 支持。
假设你有一个依赖于注入配置和键的 Actor:
import com.google.inject.assistedinject.Assisted;
import com.typesafe.config.Config;
import javax.inject.Inject;
import org.apache.pekko.actor.AbstractActor;
public class ConfiguredChildActor extends AbstractActor {
private final Config configuration;
private final String key;
@Inject
public ConfiguredChildActor(Config configuration, @Assisted String key) {
this.configuration = configuration;
this.key = key;
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ConfiguredChildActorProtocol.GetConfig.class, this::getConfig)
.build();
}
private void getConfig(ConfiguredChildActorProtocol.GetConfig get) {
sender().tell(configuration.getString(key), self());
}
}
在本例中,我们使用了构造函数注入 - Guice 的辅助注入支持仅与构造函数注入兼容。由于 key
参数是在创建时提供的,而不是由容器提供的,因此我们用 @Assisted
注解了它。
现在,在子 Actor 的协议中,我们定义了一个 Factory
接口,它接受 key
并返回 Actor
。
import org.apache.pekko.actor.Actor;
public class ConfiguredChildActorProtocol {
public static class GetConfig {}
public interface Factory {
public Actor create(String key);
}
}
我们不会实现它,Guice 会为我们实现,提供一个不仅传递 key
参数,而且还会定位 Configuration
依赖项并注入它的实现。由于该特征只返回一个 Actor
,因此在测试该 Actor 时,我们可以注入一个返回任何 Actor 的工厂,例如,这允许我们注入一个模拟的子 Actor,而不是实际的子 Actor。
现在,依赖于此 Actor 的 Actor 可以扩展 InjectedActorSupport
,并且它可以依赖于我们创建的工厂。
import javax.inject.Inject;
import org.apache.pekko.actor.AbstractActor;
import org.apache.pekko.actor.ActorRef;
import play.libs.pekko.InjectedActorSupport;
public class ParentActor extends AbstractActor implements InjectedActorSupport {
private ConfiguredChildActorProtocol.Factory childFactory;
@Inject
public ParentActor(ConfiguredChildActorProtocol.Factory childFactory) {
this.childFactory = childFactory;
}
@Override
public Receive createReceive() {
return receiveBuilder().match(ParentActorProtocol.GetChild.class, this::getChild).build();
}
private void getChild(ParentActorProtocol.GetChild msg) {
String key = msg.key;
ActorRef child = injectedChild(() -> childFactory.create(key), key);
sender().tell(child, self());
}
}
它使用 injectedChild
创建并获取对子 Actor 的引用,传入键。第二个参数(在本例中为 key
)将用作子 Actor 的名称。
最后,我们需要绑定我们的 Actor。在我们的模块中,我们使用 bindActorFactory
方法绑定父 Actor,并将子工厂绑定到子实现。
import com.google.inject.AbstractModule;
import play.libs.pekko.PekkoGuiceSupport;
public class MyModule extends AbstractModule implements PekkoGuiceSupport {
@Override
protected void configure() {
bindActor(ParentActor.class, "parent-actor");
bindActorFactory(ConfiguredChildActor.class, ConfiguredChildActorProtocol.Factory.class);
}
}
这将使 Guice 自动绑定 ConfiguredChildActorProtocol.Factory
的实例,该实例将在实例化时为 ConfiguredChildActor
提供 Configuration
的实例。
§配置
默认的 Actor 系统配置是从 Play 应用程序配置文件中读取的。例如,要配置应用程序 Actor 系统的默认调度器,请将以下行添加到 conf/application.conf
文件中
pekko.actor.default-dispatcher.fork-join-executor.parallelism-max = 64
pekko.actor.debug.receive = on
有关 Pekko 日志记录配置,请参阅 配置日志记录。
§内置 Actor 系统名称
默认情况下,Play Actor 系统的名称为 application
。你可以通过 conf/application.conf
中的条目更改此名称。
play.pekko.actor-system = "custom-name"
注意:如果你想将 Play 应用程序
ActorSystem
放入 Pekko 集群中,此功能很有用。
§使用你自己的 Actor 系统
虽然我们建议你使用内置的 Actor 系统,因为它会设置所有内容,例如正确的类加载器、生命周期钩子等,但没有什么能阻止你使用你自己的 Actor 系统。但是,务必确保你执行以下操作
- 注册一个 停止钩子,以便在 Play 关闭时关闭 Actor 系统
- 从 Play 环境 传入正确的类加载器,否则 Pekko 将无法找到你的应用程序类
- 请确保您不要从默认的
pekko
配置文件中读取 Pekko 配置,该配置文件已被 Play 的 Actor 系统使用,因为这会导致问题,例如当系统尝试绑定到相同的远程端口时。
§异步执行代码块
Pekko 中一个常见的用例是让一些计算并发执行,而不需要 Actor 的额外实用程序。如果您发现自己创建了一个 Actor 池,仅仅是为了并行执行计算,那么有一个更简单(更快)的方法。
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import play.mvc.*;
public class Application extends Controller {
public CompletionStage<Result> index() {
return CompletableFuture.supplyAsync(this::longComputation)
.thenApply((Integer i) -> ok("Got " + i));
}
}
§Pekko 协调关闭
Play 使用 Pekko 的 协调关闭 来处理 Application
和 Server
的关闭。在 协调关闭 通用部分中查找更多信息。
注意:Play 只处理其内部 ActorSystem
的关闭。如果您使用额外的 Actor 系统,请确保它们都已终止,并随时将您的终止代码迁移到 协调关闭。
§Pekko 集群
您可以让您的 Play 应用程序加入一个现有的 Pekko 集群。在这种情况下,建议您优雅地离开集群。Play 附带了 Pekko 的协调关闭,它将负责优雅地离开。
如果您已经拥有自定义的集群离开代码,建议您将其替换为 Pekko 的处理。有关更多详细信息,请参阅 Pekko 文档。
§更新 Pekko 版本
如果您想使用一个 Play 尚未使用的较新版本的 Pekko,您可以在您的 build.sbt
文件中添加以下内容
// The newer Pekko version you want to use.
val pekkoVersion = "1.0.0"
// Pekko dependencies used by Play
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-actor" % pekkoVersion,
"org.apache.pekko" %% "pekko-actor-typed" % pekkoVersion,
"org.apache.pekko" %% "pekko-stream" % pekkoVersion,
"org.apache.pekko" %% "pekko-slf4j" % pekkoVersion,
"org.apache.pekko" %% "pekko-serialization-jackson" % pekkoVersion,
// Only if you are using Pekko Testkit
"org.apache.pekko" %% "pekko-testkit" % pekkoVersion
)
当然,其他 Pekko 工件可以被传递地添加。使用 sbt-dependency-graph 更好地检查您的构建并检查您需要显式添加哪些工件。
如果您尚未切换到 Netty 服务器后端,因此正在使用 Play 的默认 Pekko HTTP 服务器后端,您还需要更新 Pekko HTTP。因此,您还需要显式添加其依赖项
// The newer Pekko HTTP version you want to use.
val pekkoHTTPVersion = "1.0.0"
// Pekko HTTP dependency used by Play
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http-core" % pekkoHTTPVersion
)
注意:在进行此类更新时,请记住您需要遵循 Pekko 的 二进制兼容性规则。如果您手动添加其他 Pekko 工件,请记住保持所有 Pekko 工件的版本一致,因为 混合版本是不允许的。
注意:在解析依赖项时,sbt 将获取为该项目声明的或传递添加的最新依赖项。这意味着,如果 Play 依赖于比您声明的版本更新的 Pekko(或 Pekko HTTP)版本,则 Play 的版本将获胜。有关 sbt 如何在此处进行排除 的更多详细信息,请参阅此处。
下一步:使用消息进行国际化
发现文档中的错误? 此页面的源代码可以在 此处 找到。 阅读完 文档指南 后,请随时贡献拉取请求。 有问题或建议要分享? 转到 我们的社区论坛 与社区开始对话。