#![feature(impl_trait_in_assoc_type)]
use std::{
any::Any,
collections::HashMap,
fmt::Display,
future::{ready, Future, Ready},
marker::PhantomData,
net::IpAddr,
ops::DerefMut,
pin::Pin,
sync::Arc,
};
use btlib::{bterr, crypto::Creds, error::StringError, BlockPath, Result};
use bttp::{DeserCallback, MsgCallback, Receiver, Replier, Transmitter};
use btserde::{field_helpers::smart_ptr, from_slice, to_vec, write_to};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use tokio::{
sync::{mpsc, oneshot, Mutex, RwLock},
task::JoinHandle,
};
use uuid::Uuid;
#[macro_export]
macro_rules! declare_runtime {
($name:ident, $ip_addr:expr, $creds:expr) => {
::lazy_static::lazy_static! {
static ref $name: &'static Runtime = {
::lazy_static::lazy_static! {
static ref RUNTIME: Runtime = Runtime::_new($creds).unwrap();
static ref RECEIVER: Receiver = _new_receiver($ip_addr, $creds, &*RUNTIME);
}
let _ = &*RECEIVER;
&*RUNTIME
};
}
};
}
#[doc(hidden)]
pub fn _new_receiver<C>(ip_addr: IpAddr, creds: Arc<C>, runtime: &'static Runtime) -> Receiver
where
C: 'static + Send + Sync + Creds,
{
let callback = RuntimeCallback::new(runtime);
Receiver::new(ip_addr, creds, callback).unwrap()
}
pub struct Runtime {
path: Arc<BlockPath>,
handles: RwLock<HashMap<Uuid, ActorHandle>>,
peers: RwLock<HashMap<Arc<BlockPath>, Transmitter>>,
}
impl Runtime {
#[doc(hidden)]
pub fn _new<C: 'static + Send + Sync + Creds>(creds: Arc<C>) -> Result<Runtime> {
let path = Arc::new(creds.bind_path()?);
Ok(Runtime {
path,
handles: RwLock::new(HashMap::new()),
peers: RwLock::new(HashMap::new()),
})
}
pub fn path(&self) -> &Arc<BlockPath> {
&self.path
}
pub async fn num_running(&self) -> usize {
let guard = self.handles.read().await;
guard.len()
}
pub async fn send<T: 'static + SendMsg>(
&self,
to: ActorName,
from: ActorName,
msg: T,
) -> Result<()> {
if to.path == self.path {
let guard = self.handles.read().await;
if let Some(handle) = guard.get(&to.act_id) {
handle.send(from, msg).await
} else {
Err(bterr!("invalid actor name"))
}
} else {
let guard = self.peers.read().await;
if let Some(peer) = guard.get(&to.path) {
let buf = to_vec(&msg)?;
let wire_msg = WireMsg {
to,
from,
payload: &buf,
};
peer.send(wire_msg).await
} else {
todo!()
}
}
}
pub async fn call<T: 'static + CallMsg>(
&self,
to: ActorName,
from: ActorName,
msg: T,
) -> Result<T::Reply> {
if to.path == self.path {
let guard = self.handles.read().await;
if let Some(handle) = guard.get(&to.act_id) {
handle.call_through(from, msg).await
} else {
Err(bterr!("invalid actor name"))
}
} else {
let guard = self.peers.read().await;
if let Some(peer) = guard.get(&to.path) {
let buf = to_vec(&msg)?;
let wire_msg = WireMsg {
to,
from,
payload: &buf,
};
peer.call(wire_msg, ReplyCallback::<T>::new()).await?
} else {
todo!()
}
}
}
pub async fn resolve<'a>(&'a self, _service: &ServiceName) -> Result<ActorName> {
todo!()
}
pub async fn activate<Msg, F, Fut>(&'static self, activator: F) -> ActorName
where
Msg: 'static + CallMsg,
Fut: 'static + Send + Future<Output = ()>,
F: FnOnce(&'static Runtime, mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
{
let mut guard = self.handles.write().await;
let act_id = {
let mut act_id = Uuid::new_v4();
while guard.contains_key(&act_id) {
act_id = Uuid::new_v4();
}
act_id
};
let act_name = self.actor_name(act_id);
let (tx, rx) = mpsc::channel::<Envelope<Msg>>(MAILBOX_LIMIT);
let deliverer = {
let buffer = Arc::new(Mutex::new(Vec::<u8>::new()));
let tx = tx.clone();
let act_name = act_name.clone();
move |envelope: WireEnvelope| {
let (wire_msg, replier) = envelope.into_parts();
let result = from_slice(wire_msg.payload);
let buffer = buffer.clone();
let tx = tx.clone();
let act_name = act_name.clone();
let fut: FutureResult = Box::pin(async move {
let msg = result?;
if let Some(mut replier) = replier {
let (envelope, rx) = Envelope::new_call(act_name, msg);
tx.send(envelope).await.map_err(|_| {
bterr!("failed to deliver message. Recipient may have halted.")
})?;
match rx.await {
Ok(reply) => {
let mut guard = buffer.lock().await;
guard.clear();
write_to(&reply, guard.deref_mut())?;
let wire_reply = WireReply::Ok(&guard);
replier.reply(wire_reply).await
}
Err(err) => replier.reply_err(err.to_string(), None).await,
}
} else {
tx.send(Envelope::new_send(act_name, msg))
.await
.map_err(|_| {
bterr!("failed to deliver message. Recipient may have halted.")
})
}
});
fut
}
};
let handle = tokio::task::spawn(activator(self, rx, act_id));
let actor_handle = ActorHandle::new(handle, tx, deliverer);
guard.insert(act_id, actor_handle);
act_name
}
pub async fn register<Msg, Fut, F, G>(
&self,
_id: ServiceId,
_activator: F,
_deserializer: G,
) -> Result<()>
where
Msg: 'static + CallMsg,
Fut: 'static + Send + Future<Output = ()>,
F: Fn(mpsc::Receiver<Envelope<Msg>>, Uuid) -> Fut,
G: 'static + Send + Sync + Fn(&[u8]) -> Result<Msg>,
{
todo!()
}
pub async fn take(&self, name: &ActorName) -> Result<ActorHandle> {
if name.path == self.path {
let mut guard = self.handles.write().await;
if let Some(handle) = guard.remove(&name.act_id) {
Ok(handle)
} else {
Err(RuntimeError::BadActorName(name.clone()).into())
}
} else {
Err(RuntimeError::BadActorName(name.clone()).into())
}
}
pub fn actor_name(&self, act_id: Uuid) -> ActorName {
ActorName::new(self.path.clone(), act_id)
}
}
impl Drop for Runtime {
fn drop(&mut self) {
panic!("A Runtime was dropped. Panicking to avoid undefined behavior.");
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RuntimeError {
BadActorName(ActorName),
}
impl Display for RuntimeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::BadActorName(name) => write!(f, "bad actor name: {name}"),
}
}
}
impl std::error::Error for RuntimeError {}
struct ReplyCallback<T> {
_phantom: PhantomData<T>,
}
impl<T: CallMsg> ReplyCallback<T> {
fn new() -> Self {
Self {
_phantom: PhantomData,
}
}
}
impl<T: CallMsg> DeserCallback for ReplyCallback<T> {
type Arg<'de> = WireReply<'de> where T: 'de;
type Return = Result<T::Reply>;
type CallFut<'de> = Ready<Self::Return> where T: 'de, T::Reply: 'de;
fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
let result = match arg {
WireReply::Ok(slice) => from_slice(slice).map_err(|err| err.into()),
WireReply::Err(msg) => Err(StringError::new(msg.to_string()).into()),
};
ready(result)
}
}
struct SendReplyCallback {
replier: Option<Replier>,
}
impl SendReplyCallback {
fn new(replier: Replier) -> Self {
Self {
replier: Some(replier),
}
}
}
impl DeserCallback for SendReplyCallback {
type Arg<'de> = WireReply<'de>;
type Return = Result<()>;
type CallFut<'de> = impl 'de + Future<Output = Self::Return>;
fn call<'de>(&'de mut self, arg: Self::Arg<'de>) -> Self::CallFut<'de> {
async move {
if let Some(mut replier) = self.replier.take() {
replier.reply(arg).await
} else {
Ok(())
}
}
}
}
#[derive(Clone)]
struct RuntimeCallback {
rt: &'static Runtime,
}
impl RuntimeCallback {
fn new(rt: &'static Runtime) -> Self {
Self { rt }
}
async fn deliver_local(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
let guard = self.rt.handles.read().await;
if let Some(handle) = guard.get(&msg.to.act_id) {
let envelope = if let Some(replier) = replier {
WireEnvelope::Call { msg, replier }
} else {
WireEnvelope::Send { msg }
};
(handle.deliverer)(envelope).await
} else {
Err(bterr!("invalid actor name: {}", msg.to))
}
}
async fn route_msg(&self, msg: WireMsg<'_>, replier: Option<Replier>) -> Result<()> {
let guard = self.rt.peers.read().await;
if let Some(tx) = guard.get(msg.to.path()) {
if let Some(replier) = replier {
let callback = SendReplyCallback::new(replier);
tx.call(msg, callback).await?
} else {
tx.send(msg).await
}
} else {
Err(bterr!(
"unable to deliver message to peer at '{}'",
msg.to.path()
))
}
}
}
impl MsgCallback for RuntimeCallback {
type Arg<'de> = WireMsg<'de>;
type CallFut<'de> = impl 'de + Future<Output = Result<()>>;
fn call<'de>(&'de self, arg: bttp::MsgReceived<Self::Arg<'de>>) -> Self::CallFut<'de> {
async move {
let (_, body, replier) = arg.into_parts();
if body.to.path() == self.rt.path() {
self.deliver_local(body, replier).await
} else {
self.route_msg(body, replier).await
}
}
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
pub struct ServiceId(#[serde(with = "smart_ptr")] Arc<String>);
impl From<String> for ServiceId {
fn from(value: String) -> Self {
Self(Arc::new(value))
}
}
impl<'a> From<&'a str> for ServiceId {
fn from(value: &'a str) -> Self {
Self(Arc::new(value.to_owned()))
}
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
pub struct ServiceName {
#[serde(with = "smart_ptr")]
path: Arc<BlockPath>,
service_id: ServiceId,
}
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Serialize, Deserialize)]
pub struct ActorName {
#[serde(with = "smart_ptr")]
path: Arc<BlockPath>,
act_id: Uuid,
}
impl ActorName {
pub fn new(path: Arc<BlockPath>, act_id: Uuid) -> Self {
Self { path, act_id }
}
pub fn path(&self) -> &Arc<BlockPath> {
&self.path
}
pub fn act_id(&self) -> Uuid {
self.act_id
}
}
impl Display for ActorName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}@{}", self.act_id, self.path)
}
}
pub trait CallMsg: Serialize + DeserializeOwned + Send + Sync {
type Reply: Serialize + DeserializeOwned + Send + Sync;
}
pub trait SendMsg: CallMsg {}
const MAILBOX_LIMIT: usize = 32;
#[derive(Serialize, Deserialize)]
struct WireMsg<'a> {
to: ActorName,
from: ActorName,
payload: &'a [u8],
}
impl<'a> bttp::CallMsg<'a> for WireMsg<'a> {
type Reply<'r> = WireReply<'r>;
}
impl<'a> bttp::SendMsg<'a> for WireMsg<'a> {}
#[derive(Serialize, Deserialize)]
enum WireReply<'a> {
Ok(&'a [u8]),
Err(&'a str),
}
enum WireEnvelope<'de> {
Send { msg: WireMsg<'de> },
Call { msg: WireMsg<'de>, replier: Replier },
}
impl<'de> WireEnvelope<'de> {
fn into_parts(self) -> (WireMsg<'de>, Option<Replier>) {
match self {
Self::Send { msg } => (msg, None),
Self::Call { msg, replier } => (msg, Some(replier)),
}
}
}
pub struct Envelope<T: CallMsg> {
from: ActorName,
reply: Option<oneshot::Sender<T::Reply>>,
msg: T,
}
impl<T: CallMsg> Envelope<T> {
pub fn new(msg: T, reply: Option<oneshot::Sender<T::Reply>>, from: ActorName) -> Self {
Self { from, reply, msg }
}
fn new_send(from: ActorName, msg: T) -> Self {
Self {
from,
msg,
reply: None,
}
}
fn new_call(from: ActorName, msg: T) -> (Self, oneshot::Receiver<T::Reply>) {
let (tx, rx) = oneshot::channel::<T::Reply>();
let envelope = Self {
from,
msg,
reply: Some(tx),
};
(envelope, rx)
}
pub fn from(&self) -> &ActorName {
&self.from
}
pub fn msg(&self) -> &T {
&self.msg
}
pub fn reply(&mut self, reply: T::Reply) -> Result<()> {
if let Some(tx) = self.reply.take() {
if tx.send(reply).is_ok() {
Ok(())
} else {
Err(bterr!("failed to send reply"))
}
} else {
Err(bterr!("reply already sent"))
}
}
pub fn needs_reply(&self) -> bool {
self.reply.is_some()
}
pub fn split(self) -> (T, Option<oneshot::Sender<T::Reply>>, ActorName) {
(self.msg, self.reply, self.from)
}
}
type FutureResult = Pin<Box<dyn Send + Future<Output = Result<()>>>>;
pub struct ActorHandle {
handle: Option<JoinHandle<()>>,
sender: Box<dyn Send + Sync + Any>,
deliverer: Box<dyn Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult>,
}
impl ActorHandle {
fn new<T, F>(handle: JoinHandle<()>, sender: mpsc::Sender<Envelope<T>>, deliverer: F) -> Self
where
T: 'static + CallMsg,
F: 'static + Send + Sync + Fn(WireEnvelope<'_>) -> FutureResult,
{
Self {
handle: Some(handle),
sender: Box::new(sender),
deliverer: Box::new(deliverer),
}
}
fn sender<T: 'static + CallMsg>(&self) -> Result<&mpsc::Sender<Envelope<T>>> {
self.sender
.downcast_ref::<mpsc::Sender<Envelope<T>>>()
.ok_or_else(|| bterr!("unexpected message type"))
}
pub async fn send<T: 'static + SendMsg>(&self, from: ActorName, msg: T) -> Result<()> {
let sender = self.sender()?;
sender
.send(Envelope::new_send(from, msg))
.await
.map_err(|_| bterr!("failed to enqueue message"))?;
Ok(())
}
pub async fn call_through<T: 'static + CallMsg>(
&self,
from: ActorName,
msg: T,
) -> Result<T::Reply> {
let sender = self.sender()?;
let (envelope, rx) = Envelope::new_call(from, msg);
sender
.send(envelope)
.await
.map_err(|_| bterr!("failed to enqueue call"))?;
let reply = rx.await?;
Ok(reply)
}
pub async fn returned(&mut self) -> Result<()> {
if let Some(handle) = self.handle.take() {
handle.await?;
}
Ok(())
}
pub fn abort(&mut self) {
if let Some(handle) = self.handle.take() {
handle.abort();
}
}
}
impl Drop for ActorHandle {
fn drop(&mut self) {
self.abort();
}
}
#[cfg(test)]
mod tests {
use super::*;
use btlib::{
crypto::{ConcreteCreds, CredStore, CredsPriv},
log::BuilderExt,
};
use btlib_tests::TEST_STORE;
use bttp::BlockAddr;
use btserde::to_vec;
use ctor::ctor;
use lazy_static::lazy_static;
use std::{
net::{IpAddr, Ipv4Addr},
sync::atomic::{AtomicU8, Ordering},
time::{Duration, Instant},
};
use tokio::runtime::Builder;
const RUNTIME_ADDR: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
lazy_static! {
static ref RUNTIME_CREDS: Arc<ConcreteCreds> = TEST_STORE.node_creds().unwrap();
}
declare_runtime!(RUNTIME, RUNTIME_ADDR, RUNTIME_CREDS.clone());
lazy_static! {
static ref ASYNC_RT: tokio::runtime::Runtime = Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
}
const LOG_LEVEL: &str = "warn";
#[ctor]
fn ctor() {
std::env::set_var("RUST_LOG", format!("{},quinn=WARN", LOG_LEVEL));
env_logger::Builder::from_default_env().btformat().init();
}
#[derive(Serialize, Deserialize)]
struct EchoMsg(String);
impl CallMsg for EchoMsg {
type Reply = EchoMsg;
}
async fn echo(
_rt: &'static Runtime,
mut mailbox: mpsc::Receiver<Envelope<EchoMsg>>,
_act_id: Uuid,
) {
while let Some(envelope) = mailbox.recv().await {
let (msg, replier, ..) = envelope.split();
if let Some(replier) = replier {
if let Err(_) = replier.send(msg) {
panic!("failed to send reply");
}
}
}
}
#[test]
fn local_call() {
ASYNC_RT.block_on(async {
const EXPECTED: &str = "hello";
let name = RUNTIME.activate(echo).await;
let from = ActorName::new(name.path().clone(), Uuid::default());
let reply = RUNTIME
.call(name.clone(), from, EchoMsg(EXPECTED.into()))
.await
.unwrap();
assert_eq!(EXPECTED, reply.0);
RUNTIME.take(&name).await.unwrap();
})
}
#[test]
fn remote_call() {
ASYNC_RT.block_on(async {
const EXPECTED: &str = "hello";
let actor_name = RUNTIME.activate(echo).await;
let bind_path = Arc::new(RUNTIME_CREDS.bind_path().unwrap());
let block_addr = Arc::new(BlockAddr::new(RUNTIME_ADDR, bind_path));
let transmitter = Transmitter::new(block_addr, RUNTIME_CREDS.clone())
.await
.unwrap();
let buf = to_vec(&EchoMsg(EXPECTED.to_string())).unwrap();
let wire_msg = WireMsg {
to: actor_name.clone(),
from: RUNTIME.actor_name(Uuid::default()),
payload: &buf,
};
let reply = transmitter
.call(wire_msg, ReplyCallback::<EchoMsg>::new())
.await
.unwrap()
.unwrap();
assert_eq!(EXPECTED, reply.0);
RUNTIME.take(&actor_name).await.unwrap();
});
}
#[tokio::test]
async fn num_running() {
declare_runtime!(
LOCAL_RT,
IpAddr::from([127, 0, 0, 2]),
TEST_STORE.node_creds().unwrap()
);
assert_eq!(0, LOCAL_RT.num_running().await);
let name = LOCAL_RT.activate(echo).await;
assert_eq!(1, LOCAL_RT.num_running().await);
LOCAL_RT.take(&name).await.unwrap();
assert_eq!(0, LOCAL_RT.num_running().await);
}
struct Activate {
rt: &'static Runtime,
act_id: Uuid,
}
#[derive(Serialize, Deserialize)]
struct Ping;
impl CallMsg for Ping {
type Reply = ();
}
impl SendMsg for Ping {}
#[derive(Serialize, Deserialize)]
struct Pong;
impl CallMsg for Pong {
type Reply = ();
}
impl SendMsg for Pong {}
trait ClientInit {
type AfterActivate: SentPing;
type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
}
trait SentPing {
type AfterPong: Returned;
type HandlePongFut: Future<Output = Result<Self::AfterPong>>;
fn handle_pong(self, msg: Envelope<Pong>) -> Self::HandlePongFut;
}
trait ServerInit {
type AfterActivate: Listening;
type HandleActivateFut: Future<Output = Result<Self::AfterActivate>>;
fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut;
}
trait Listening {
type AfterPing: Returned;
type HandlePingFut: Future<Output = Result<Self::AfterPing>>;
fn handle_ping(self, msg: Envelope<Ping>) -> Self::HandlePingFut;
}
trait Returned {}
struct ReturnedState;
impl Returned for ReturnedState {}
trait PingProtocol {
type Client: ClientInit;
type Server: ServerInit;
}
#[derive(Serialize, Deserialize)]
enum PingProtocolMsg {
Ping(Ping),
Pong(Pong),
}
impl CallMsg for PingProtocolMsg {
type Reply = ();
}
impl SendMsg for PingProtocolMsg {}
#[allow(dead_code)]
enum PingClientState {
Init(ClientInitState),
SentPing(ClientState),
Returned(ReturnedState),
}
#[allow(dead_code)]
enum PingServerState {
ServerInit(ServerInitState),
Listening(ServerState),
Returned(ReturnedState),
}
struct ClientInitState {
server_name: ActorName,
}
struct ClientState;
impl ClientInit for ClientInitState {
type AfterActivate = ClientState;
type HandleActivateFut = impl Future<Output = Result<Self::AfterActivate>>;
fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut {
async move {
let from = msg.rt.actor_name(msg.act_id);
msg.rt
.send(self.server_name, from, PingProtocolMsg::Ping(Ping))
.await?;
Ok(ClientState)
}
}
}
impl SentPing for ClientState {
type AfterPong = ReturnedState;
type HandlePongFut = Ready<Result<Self::AfterPong>>;
fn handle_pong(self, _msg: Envelope<Pong>) -> Self::HandlePongFut {
ready(Ok(ReturnedState))
}
}
struct ServerInitState;
struct ServerState {
rt: &'static Runtime,
act_id: Uuid,
}
impl ServerInit for ServerInitState {
type AfterActivate = ServerState;
type HandleActivateFut = Ready<Result<Self::AfterActivate>>;
fn handle_activate(self, msg: Activate) -> Self::HandleActivateFut {
ready(Ok(ServerState {
rt: msg.rt,
act_id: msg.act_id,
}))
}
}
impl Listening for ServerState {
type AfterPing = ReturnedState;
type HandlePingFut = impl Future<Output = Result<Self::AfterPing>>;
fn handle_ping(self, msg: Envelope<Ping>) -> Self::HandlePingFut {
async move {
let to = msg.from;
let from = self.rt.actor_name(self.act_id);
self.rt.send(to, from, PingProtocolMsg::Pong(Pong)).await?;
Ok(ReturnedState)
}
}
}
async fn ping_server(
counter: Arc<AtomicU8>,
rt: &'static Runtime,
mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
act_id: Uuid,
) {
let mut state = {
let init = ServerInitState;
let state = init.handle_activate(Activate { rt, act_id }).await.unwrap();
PingServerState::Listening(state)
};
while let Some(envelope) = mailbox.recv().await {
let (msg, replier, from) = envelope.split();
match (state, msg) {
(PingServerState::Listening(listening_state), PingProtocolMsg::Ping(msg)) => {
let envelope = Envelope::new(msg, replier, from);
state = PingServerState::Returned(
listening_state.handle_ping(envelope).await.unwrap(),
);
}
_ => {
log::error!("Ping protocol violation.");
break;
}
}
if let PingServerState::Returned(_) = state {
break;
}
}
counter.fetch_sub(1, Ordering::SeqCst);
}
async fn ping_client(
counter: Arc<AtomicU8>,
server_name: ActorName,
rt: &'static Runtime,
mut mailbox: mpsc::Receiver<Envelope<PingProtocolMsg>>,
act_id: Uuid,
) {
let mut state = {
let init = ClientInitState { server_name };
let state = init.handle_activate(Activate { rt, act_id }).await.unwrap();
PingClientState::SentPing(state)
};
while let Some(envelope) = mailbox.recv().await {
let (msg, replier, from) = envelope.split();
match (state, msg) {
(PingClientState::SentPing(curr_state), PingProtocolMsg::Pong(msg)) => {
let envelope = Envelope::new(msg, replier, from);
state =
PingClientState::Returned(curr_state.handle_pong(envelope).await.unwrap());
}
_ => {
log::error!("Ping protocol violation.");
break;
}
}
if let PingClientState::Returned(_) = state {
break;
}
}
counter.fetch_sub(1, Ordering::SeqCst);
}
#[test]
fn ping_pong_test() {
ASYNC_RT.block_on(async {
let counter = Arc::new(AtomicU8::new(2));
let server_name = {
let counter = counter.clone();
RUNTIME
.activate(move |rt, mailbox, act_id| ping_server(counter, rt, mailbox, act_id))
.await
};
let client_name = {
let server_name = server_name.clone();
let counter = counter.clone();
RUNTIME
.activate(move |rt, mailbox, act_id| {
ping_client(counter, server_name, rt, mailbox, act_id)
})
.await
};
let deadline = Instant::now() + Duration::from_millis(500);
while counter.load(Ordering::SeqCst) > 0 && Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(20)).await;
}
RUNTIME.take(&server_name).await.unwrap();
RUNTIME.take(&client_name).await.unwrap();
});
}
}