1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::future::Future;
use std::intrinsics::transmute;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use crate::access::*;
use crate::client::GrpcClient;
use crate::transaction::*;

/// Repeatedly queries a client about a transaction,
/// yielding the result after it has been sealed or expired.
///
/// If an error occured while making requests, yields Err.
/// If the timeout has reached, yields Ok(None). Otherwise,
/// yields Ok(Some(transaction_result)).
pub struct Finalize<Id, C: GrpcClient<GetTransactionRequest<Id>, TransactionResultResponse>> {
    tx_id: Id,
    client: C,
    delay: Duration,
    timeout: futures_timer::Delay,
    state: FinalizeState<Id, C>,
}

impl<Id, C: GrpcClient<GetTransactionRequest<Id>, TransactionResultResponse>> Finalize<Id, C> {
    /// Creates a new instance of [`Finalize`] with the transaction's id, the client, the delay, and the timeout.
    pub fn new(tx_id: Id, mut client: C, delay: Duration, timeout: Duration) -> Self
    where
        Id: Copy,
    {
        let timeout = futures_timer::Delay::new(timeout);
        let fut = client.send(GetTransactionRequest { id: tx_id });

        // transmute PinnedBox<dyn Future + 'a> to PinnedBox<dyn Future + 'static>
        //
        // SAFETY: this is safe since we never leak the future to elsewhere.
        // Since it will always be contained in this structure, and the box is always valid if 'a is valid,
        // and 'a is valid for the entire lifetime of `Self`, the box is valid for the entire lifetime of
        // `Self`
        let fut: Pin<Box<dyn Future<Output = Result<TransactionResultResponse, C::Error>>>> =
            unsafe { transmute(fut) };
        let state = FinalizeState::Request(fut);

        Self {
            tx_id,
            client,
            delay,
            timeout,
            state,
        }
    }
}

impl<Id, C: GrpcClient<GetTransactionRequest<Id>, TransactionResultResponse>> Future
    for Finalize<Id, C>
where
    Self: Unpin,
    Id: Copy,
{
    type Output = Result<Option<TransactionResultResponse>, C::Error>;

    fn poll(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Self::Output> {
        let this = &mut *self;

        // We haven't made any progress.
        // Returns Poll::Pending if the timeout hasn't been reached.
        macro_rules! pending {
            () => {
                match Pin::new(&mut this.timeout).poll(cx) {
                    Poll::Pending => Poll::Pending,
                    // timeout has reached and we still haven't got
                    Poll::Ready(()) => Poll::Ready(Ok(None)),
                }
            };
        }
        match &mut this.state {
            FinalizeState::Request(df) => match df.as_mut().poll(cx) {
                Poll::Ready(Ok(response)) => {
                    match response.status {
                        TransactionStatus::Sealed | TransactionStatus::Expired => {
                            Poll::Ready(Ok(Some(response)))
                        }
                        // not finalized yet
                        // if the response suggests that the transaction is still ongoing, switch state to delay.
                        _ => {
                            this.state =
                                FinalizeState::Waiting(futures_timer::Delay::new(this.delay));

                            // Poll `self` again, returning if timed out.
                            self.poll(cx)
                        }
                    }
                }
                // If an error occured, return the error.
                Poll::Ready(Err(error)) => Poll::Ready(Err(error)),
                Poll::Pending => pending!(),
            },
            FinalizeState::Waiting(delay) => match Pin::new(delay).poll(cx) {
                Poll::Ready(()) => {
                    // Send another request.
                    let fut = this.client.send(GetTransactionRequest { id: this.tx_id });
                    // transmute PinnedBox<dyn Future + 'a> to PinnedBox<dyn Future + 'static>
                    //
                    // SAFETY: this is safe since we never leak the future to elsewhere.
                    // Since it will always be contained in this structure, and the box is always valid if 'a is valid,
                    // and 'a is valid for the entire lifetime of `Self`, the box is valid for the entire lifetime of
                    // `Self`
                    let fut: Pin<
                        Box<dyn Future<Output = Result<TransactionResultResponse, C::Error>>>,
                    > = unsafe { transmute(fut) };
                    self.state = FinalizeState::Request(fut);

                    // Poll `self` again, this time on the request.
                    self.poll(cx)
                }
                Poll::Pending => pending!(),
            },
        }
    }
}

enum FinalizeState<Id, C: GrpcClient<GetTransactionRequest<Id>, TransactionResultResponse>> {
    Request(Pin<Box<dyn Future<Output = Result<TransactionResultResponse, C::Error>>>>),
    Waiting(futures_timer::Delay),
}