This guide explains how to use the code generated by the Rust gRPC compiler. It covers creating stubs, performing unary and streaming RPCs, and optimizing requests and responses with protobuf "view" types.
The entry point for any RPC is the Service Stub. Stubs are generic over the
underlying transport (Channel), making them flexible and lightweight.
use grpc::Channel;
use my_service::MyServiceClientStub;
use my_service::pb::*; // Messages
// 1. Create a gRPC channel.
let channel = Channel::new("localhost:1234");
// 2. Wrap it in the generated stub
let client = MyServiceClientStub::new(channel);Note: Stubs are cheap to clone. Cloning a stub just increments a reference count of the underlying channel.
For every message defined in your .proto file, the protobuf message compiler
generates three Rust types: owned messages, mutable views, and immutable views.
Use these standard structs when you need to store data or pass ownership. They own their heap allocations (e.g., String, Vec<u8>).
let request = proto!(MyRequest {
query: "search_term".to_string(),
page: 1,
});These types represent views into the owned messages. For performance reasons,
the gRPC Stub unary and server-streaming methods accept protobuf::AsView for
requests, which is implemented by both owned messages (MyRequest) and views
(MyRequestView). Unary and client-streaming methods have an optional method
to accept protobuf::AsMut if the memory for the response message needs to be
controlled.
Unary calls utilize a Future Builder Pattern. Calling the method on the stub
prepares the call but does not start it until you await the returned future
builder.
Pass an owned message or a view, and await the result.
let response = client.unary_call(proto!(MyRequest { page: 1 })).await?;
println!("Response: {:?}", response); // Either Ok(MyResponse) or Err(StatusError)Similarly, if you have a protobuf message view already:
// let request = proto!(MyRequest { page: 1 });
// let request_view: MyRequestView<'_> = request.as_view();
let response = client.unary_call(request_view).await?;
println!("Response: {:?}", response); // Either Ok(MyResponse) or Err(StatusError)You can provide a mutable reference to an existing message buffer in which to receive the response. This allows you to reuse the same memory across multiple calls, reducing memory allocations. It also enables the use of protobuf messages allocated in an Arena, which cannot produce owned message types.
// Allocate the response message.
let mut reusable_response = MyResponse::default();
let status = client.unary_call(proto!(MyRequest { page: 1 })
.with_response_message(&mut reusable_response)
.await;
// The response is deserialized directly into reusable_response if status.ok().Server streaming methods accept a futures::Sink for responses, and they return
a future that resolves to the status of the RPC.
let response_sink = Box::pin(unfold((), |_, res: MyResponse| async move {
println!("Response: {:?}", res);
Ok::<_, ()>(())
}));
let response = client.server_streaming(
proto!(MyRequest { ... }),
response_sink,
).await;
println!("Response: {:?}", response); // Either Ok(MyResponse) or Err(StatusError)For client streaming, you provide a stream of requests. The input can be any
type that implements futures_core::Stream. The method call returns a response
just like unary calls.
use async_stream::stream;
// Create a stream of requests
let input_stream = stream! {
yield proto!(MyRequest { page: 1 });
yield proto!(MyRequest { page: 2 });
};
let response = client.client_streaming(input_stream).await?;
println!("Response: {:?}", response); // Either Ok(MyResponse) or Err(StatusError)Bidi streaming combines the patterns above. You provide an input stream and output sink when performing the call. These operate concurrently.
// Create a stream of requests
let request_stream = Box::pin(stream! {
yield proto!(MyRequest { page: 1 });
yield proto!(MyRequest { page: 2 });
});
let response_sink = Box::pin(unfold((), |_, res: MyResponse| async move {
println!("Response: {:?}", res);
Ok::<_, ()>(())
}));
let status = client.bidi_streaming(request_stream, response_sink).await;