You can find the complete code for this chapter on github
Codec
In this chapter, we'll implement a Codec
that converts a stream of bytes into
a stream of a messages.
Creating a new project
Let's start by creating a new project (cargo new --lib rmp-rpc-demo
), and
adding a few dependencies in Cargo.toml
:
[package]
name = "rmp-rpc-demo"
version = "0.0.1"
authors = ["You <you@example.com>"]
description = "a msgpack-rpc client and server based on tokio"
[dependencies]
bytes = "0.4"
rmpv = "0.4"
tokio-io = "0.1"
We'll split the crate in several modules:
.
├── Cargo.toml
├── LICENSE-MIT
├── README.md
└── src
├── lib.rs
├── client.rs
├── codec.rs
├── errors.rs
├── message.rs
├── protocol.rs
└── server.rs
Create the files: touch src/{client.rs,codec.rs,errors.rs,message.rs,protocol.rs,server.rs}
. And in
src/lib.rs
we'll add the dependencies we'll need for this chapter, and
declare our modules:
# #![allow(unused_variables)] #fn main() { extern crate bytes; extern crate rmpv; extern crate tokio_io; mod client; mod codec; mod errors; mod server; pub mod message; pub mod protocol; #}
Representing messages
The first thing to do, if we want to convert a stream of bytes into a stream of messages is to implement a data structure that represents a MessagePack-RPC message.
The MessagePack-RPC
specifications
defines three type of messages: requests, notifications, and responses. Let's
naively translate this to Rust. In src/message.rs
:
# #![allow(unused_variables)] #fn main() { // src/message.rs // the specs defines a request as [type, msgid, method, params] pub struct Request { // this field must be 0 pub type: u32, pub id: u32, pub method: String, // this field is an "array of arbitrary objects" pub params: Vec<???>, } // the specs defines a response as [type, msgid, method, params] pub struct Response { // this field must be 1 pub type: u32, pub id: u32, pub error: Option<u32>, // this field is a arbitrary object pub result: ???, } pub struct Notification { // this field must be 2 pub type: u32, pub method: String, // an array of arbitrary value pub params: Vec<???>, } #}
This can be improved in multiple ways:
- The spec mentions "arbitrary values", but it's not clear how this translates
to Rust (hence the
???
). Fortunatelyrmpv
does have a type for "arbitrary messagepack values":Value
. - The
type
field seems redundant. The rust type carries the same information as the MessagePack-RPCtype
field. We can get rid of it. - The
Response
type could be improved: we have aResult
type in Rust, we don't need the error field to represent a error. Let's make theresult
field aResult
. - We need one message type, not three. This can be solved with a
Message
enum.
It becomes:
# #![allow(unused_variables)] #fn main() { // src/message.rs use rmpv::Value; pub struct Request { pub id: u32, pub method: String, pub params: Vec<Value>, } pub struct Response { pub id: u32, pub result: Result<Value, Value>, } pub struct Notification { pub method: String, pub params: Vec<Value>, } pub enum Message { Request(Request), Response(Response), Notification(Notification), } #}
Codec
We now need to create a Codec
. A codec is a type that implements
tokio_io::codec::Decoder
and
tokio_io::codec::Encoder
.
Here is the skeleton of the code, in src/codec.rs
:
# #![allow(unused_variables)] #fn main() { // src/codec.rs use std::io; use tokio_io::codec::{Decoder, Encoder}; use bytes::BytesMut; use message::Message; pub struct Codec; impl Decoder for Codec { // We want the decoder to return Message items type Item = Message; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> { unimplemented!() } } impl Encoder for Codec { // We want the encoder to encode Message items type Item = Message; type Error = io::Error; fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> { unimplemented!() } } #}
Encoding
Let's focus on the encoder first. We need to somehow turn a Message
into a
stream of bytes conform to the specs.
rmpv::encode::write_value()
looks promising: it converts a Value
into a stream of bytes and write these
bytes in a writer. All we have to do, is turn our Message
into a Value
. We
can implement this directly on Message
:
# #![allow(unused_variables)] #fn main() { // src/message.rs use std::convert::From; use rmpv::{Value, Integer, Utf8String}; impl Message { // Turn the message into a MessagePack value fn as_value(&self) -> Value { match *self { Message::Request(Request { id, ref method, ref params}) => { Value::Array(vec![ Value::Integer(Integer::from(0)), Value::Integer(Integer::from(id)), Value::String(Utf8String::from(method.as_str())), Value::Array(params.clone()), ]) } Message::Response(Response { id, ref result }) => { let (error, result) = match *result { Ok(ref result) => (Value::Nil, result.to_owned()), Err(ref err) => (err.to_owned(), Value::Nil), }; Value::Array(vec![ Value::Integer(Integer::from(1)), Value::Integer(Integer::from(id)), error, result, ]) } Message::Notification(Notification {ref method, ref params}) => { Value::Array(vec![ Value::Integer(Integer::from(2)), Value::String(Utf8String::from(method.as_str())), Value::Array(params.to_owned()), ]) } } } #}
Our Encoder
implementation becomes straightforward:
# #![allow(unused_variables)] #fn main() { impl Encoder for Codec { type Item = Message; type Error = io::Error; fn encode(&mut self, msg: Self::Item, buf: &mut BytesMut) -> io::Result<()> { Ok(rmpv::encode::write_value(&mut buf.writer(), &msg.as_value())?) } } #}
Decoding
The decoder is slightly more complicated.
A naive decoder
We'll start by implementing decode
on Message
using
rmpv::decode
. Since it's
a little bit long, we'll split it in multiple methods: Message::decode
only
decodes the message type, and then delegates the rest to Request::decode
,
Response::decode
and Notification::decode
. We'll only show
Request::decode
to keep the snippets small.
# #![allow(unused_variables)] #fn main() { // src/message.rs use rmpv::decode; // ... impl Message { // ... pub fn decode<R: Read>(rd: &mut R) -> Message { let msg = decode::value::read_value(rd)?; if let Value::Array(ref array) = msg { if array.len() < 3 { // notification are the shortest message and have 3 items panic!("Message too short"); } if let Value::Integer(msg_type) = array[0] { match msg_type.as_u64() { Some(0) => Message::Request(Message::decode(array)?), Some(1) => Message::Response(Message::decode(array)?), Some(2) => Message::Notification(Message::decode(array)?), _ => panic!("Invalid message type); } } else { panic!("Could not decode message type); } } else { panic!("Value is not an array"); } } } impl Request { fn decode(array: &[Value]) -> Self { if array.len() < 4 { panic!("Too short for a request") ; } let id = if let Value::Integer(id) = array[1] { id.as_u64().and_then(|id| Some(id as u32)).unwrap(); } else { panic!("Cannot decode request ID"); }; let method = if let Value::String(ref method) = array[2] { method.as_str().and_then(|s| Some(s.to_string())).unwrap(); } else { panic!("Cannot decode request method"); }; let params = if let Value::Array(ref params) = array[3] { params.clone() } else { panic!("Cannot decode request parameters"); }; Request {id: id, method: method, params: params} } } // etc. There is a similar decode() implementation for the Notification and Response types. #}
A very naive decoder implementation could be:
# #![allow(unused_variables)] #fn main() { impl Decoder for Codec { type Item = Message; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> { let mut buf = io::Cursor::new(&src); Ok(Message::decode(&mut buf)) } } #}
Unfortunately, this won't work. That's because we have no way to know if
there are enough bytes to read in the BytesMut
buffer to decode a full
message. When tokio choses to call decode
, the buffer may even be empty. And
given our Message::decode
implementation, this will panic.
Adding error handling
If there are not enough bytes to read, we need to let tokio know that we need
more bytes. Tokio will re-call the method later, when there is more to read.
We do so by returning Ok(None)
. But how do we know we need more bytes? Error
handling! Message::decode
could return a specific error when it fails to
decode a message because it's incomplete. We can create a new DecodeError
error type in src/errors.rs
:
# #![allow(unused_variables)] #fn main() { // src/errors.rs use std::{error, fmt, io}; use rmpv::decode; #[derive(Debug)] pub enum DecodeError { // Some bytes are missing to decode a full msgpack value Truncated, // A byte sequence could not be decoded as a msgpack value, or this value is not a valid // msgpack-rpc message. Invalid, // An unknown IO error while reading a byte sequence UnknownIo(io::Error), } impl fmt::Display for DecodeError { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { error::Error::description(self).fmt(f) } } impl error::Error for DecodeError { fn description(&self) -> &str { match *self { DecodeError::Truncated => "could not read enough bytes to decode a complete message", DecodeError::UnknownIo(_) => "Unknown IO error while decoding a message", DecodeError::Invalid => "the byte sequence is not a valid msgpack-rpc message", } } fn cause(&self) -> Option<&error::Error> { match *self { DecodeError::UnknownIo(ref e) => Some(e), _ => None, } } } impl From<io::Error> for DecodeError { fn from(err: io::Error) -> DecodeError { match err.kind() { // If this occurs, it means `rmpv` was unable to read enough bytes to decode a full // MessagePack value, so we convert this error in a DecodeError::Truncated io::ErrorKind::UnexpectedEof => DecodeError::Truncated, io::ErrorKind::Other => { if let Some(cause) = err.get_ref().unwrap().cause() { if cause.description() == "type mismatch" { return DecodeError::Invalid; } } DecodeError::UnknownIo(err) } _ => DecodeError::UnknownIo(err), } } } impl From<decode::Error> for DecodeError { fn from(err: decode::Error) -> DecodeError { match err { decode::Error::InvalidMarkerRead(io_err) | decode::Error::InvalidDataRead(io_err) => From::from(io_err), } } } #}
With error handling, we can clean up our decode
methods:
# #![allow(unused_variables)] #fn main() { // src/message.rs impl Message { pub fn decode<R: Read>(rd: &mut R) -> Result<Message, DecodeError> { let msg = decode::value::read_value(rd)?; if let Value::Array(ref array) = msg { if array.len() < 3 { return Err(DecodeError::Invalid); } if let Value::Integer(msg_type) = array[0] { match msg_type.as_u64() { Some(0) => return Ok(Message::Request(Request::decode(array)?)), Some(1) => return Ok(Message::Response(Response::decode(array)?)), Some(2) => return Ok(Message::Notification(Notification::decode(array)?)), _ => return Err(DecodeError::Invalid), } } else { return Err(DecodeError::Invalid); } } else { return Err(DecodeError::Invalid); } } // ... } // We only show the implementation for Request. It is similar for Notification // and Response impl Request { fn decode(array: &[Value]) -> Result<Self, DecodeError> { if array.len() < 4 { return Err(DecodeError::Invalid); } let id = if let Value::Integer(id) = array[1] { id.as_u64().and_then(|id| Some(id as u32)).ok_or(DecodeError::Invalid)? } else { return Err(DecodeError::Invalid); }; let method = if let Value::String(ref method) = array[2] { method.as_str().and_then(|s| Some(s.to_string())).ok_or(DecodeError::Invalid)? } else { return Err(DecodeError::Invalid); }; let params = if let Value::Array(ref params) = array[3] { params.clone() } else { return Err(DecodeError::Invalid); }; Ok(Request {id: id, method: method, params: params}) } } #}
Final decoder
The actual Decoder
implementation now becomes:
# #![allow(unused_variables)] #fn main() { impl Decoder for Codec { type Item = Message; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> io::Result<Option<Self::Item>> { let res: Result<Option<Self::Item>, Self::Error>; // We keep track of how many bytes we read let position = { // We wrap the buffer into a Cursor that counts how many bytes we read let mut buf = io::Cursor::new(&src); loop { // Try to decode a message match Message::decode(&mut buf) { // We got a message, so we break out of the loop Ok(message) => { res = Ok(Some(message)); break; } Err(err) => { match err { // Not enough bytes to decode a full message. Return Ok(None) to tell // tokio to retry later when there is more to read DecodeError::Truncated => return Ok(None), // We decoded a MessagePack value, but it's not a valid message, // so we go on, and try to read another value. DecodeError::Invalid => continue, // Something went wrong, but we don't know why. // It's safer to return an error DecodeError::UnknownIo(io_err) => { res = Err(io_err); break; } } } } } buf.position() as usize }; // Remove the bytes we read from the buffer let _ = src.split_to(position); // Return the message (or the error if any) res } } #}
To sum up, the decoder is called regularly by tokio, each time with three possible outcomes:
- We are able to read a message => return
Ok(message)
and remove these bytes from the buffer. - There are not enough bytes to read a message => return
Ok(None)
to tell tokio to retry when there is more to read, and leave the buffer intact. - An error occurs if:
- An invalid MessagePack value is read => try to read the next one
- An unknown io error occurs => return
Err(the_error)