tokio_util/io/write_all_vectored.rs
1use tokio::io::AsyncWrite;
2
3use pin_project_lite::pin_project;
4use std::marker::PhantomPinned;
5use std::pin::Pin;
6use std::task::{ready, Context, Poll};
7use std::{future::Future, io::IoSlice};
8use std::{io, mem};
9
10pin_project! {
11 /// A future that writes all data from multiple buffers to a writer.
12 #[derive(Debug)]
13 #[must_use = "futures do nothing unless you `.await` or poll them"]
14 pub struct WriteAllVectored<'a, 'b, W: ?Sized> {
15 writer: &'a mut W,
16 bufs: &'a mut [IoSlice<'b>],
17 // Make this future `!Unpin` for compatibility with async trait methods.
18 #[pin]
19 _pin: PhantomPinned,
20 }
21}
22/// Like [`write_all`] but writes all data from multiple buffers into this writer.
23///
24/// This function writes multiple (possibly non-contiguous) buffers into the writer,
25/// using the `writev` syscall to potentially write in a single system call.
26///
27/// Equivalent to:
28///
29/// ```ignore
30/// async fn write_all_vectored<W: AsyncWrite + Unpin + ?Sized>(
31/// writer: &mut W,
32/// mut bufs: &mut [IoSlice<'_>]
33/// ) -> io::Result<()> {
34/// while !bufs.is_empty() {
35/// let n = write_vectored(writer, bufs).await?;
36/// if n == 0 {
37/// return Err(io::ErrorKind::WriteZero.into());
38/// }
39/// IoSlice::advance_slices(&mut bufs, n);
40/// }
41/// Ok(())
42/// }
43/// ```
44///
45/// # Cancel safety
46///
47/// This method is not cancellation safe. If it is used as the event
48/// in a `tokio::select!` statement and some other
49/// branch completes first, then the provided buffer may have been
50/// partially written, but future calls to `write_all_vectored` will
51/// have lost its place in the buffer.
52///
53/// # Examples
54///
55/// ```rust
56/// use tokio_util::io::write_all_vectored;
57/// use std::io::IoSlice;
58///
59/// #[tokio::main(flavor = "current_thread")]
60/// async fn main() -> std::io::Result<()> {
61///
62/// let mut writer = Vec::new();
63/// let bufs = &mut [
64/// IoSlice::new(&[1]),
65/// IoSlice::new(&[2, 3]),
66/// IoSlice::new(&[4, 5, 6]),
67/// ];
68///
69/// write_all_vectored(&mut writer, bufs).await?;
70///
71/// // Note: `bufs` has been modified by `IoSlice::advance_slices` and should not be reused.
72/// assert_eq!(writer, &[1, 2, 3, 4, 5, 6]);
73/// Ok(())
74/// }
75/// ```
76///
77/// # Notes
78///
79/// See the documentation for [`Write::write_all_vectored`] from std.
80/// After calling this function, the buffer slices may have
81/// been advanced and should not be reused.
82///
83/// [`Write::write_all_vectored`]: std::io::Write::write_all_vectored
84/// [`write_all`]: tokio::io::AsyncWriteExt::write_all
85/// [`writev`]: https://man7.org/linux/man-pages/man3/writev.3p.html
86pub fn write_all_vectored<'a, 'b, W>(
87 writer: &'a mut W,
88 bufs: &'a mut [IoSlice<'b>],
89) -> WriteAllVectored<'a, 'b, W>
90where
91 W: AsyncWrite + Unpin + ?Sized,
92{
93 WriteAllVectored {
94 writer,
95 bufs,
96 _pin: PhantomPinned,
97 }
98}
99
100impl<W> Future for WriteAllVectored<'_, '_, W>
101where
102 W: AsyncWrite + Unpin + ?Sized,
103{
104 type Output = io::Result<()>;
105
106 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
107 let me = self.project();
108 while !me.bufs.is_empty() {
109 // advance to first non-empty buffer
110 let non_empty = match me.bufs.iter().position(|b| !b.is_empty()) {
111 Some(pos) => pos,
112 None => return Poll::Ready(Ok(())),
113 };
114
115 // drop empty buffers at the start
116 *me.bufs = &mut mem::take(me.bufs)[non_empty..];
117
118 let n = ready!(Pin::new(&mut *me.writer).poll_write_vectored(cx, me.bufs))?;
119 if n == 0 {
120 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
121 }
122 self::advance_slices(me.bufs, n);
123 }
124
125 Poll::Ready(Ok(()))
126 }
127}
128
129// copied from `std::IoSlice::advance_slices`
130// replace with method when MSRV is 1.81.0
131fn advance_slices<'a>(bufs: &mut &mut [IoSlice<'a>], n: usize) {
132 // Number of buffers to remove.
133 let mut remove = 0;
134 // Remaining length before reaching n. This prevents overflow
135 // that could happen if the length of slices in `bufs` were instead
136 // accumulated. Those slice may be aliased and, if they are large
137 // enough, their added length may overflow a `usize`.
138 let mut left = n;
139 for buf in bufs.iter() {
140 if let Some(remainder) = left.checked_sub(buf.len()) {
141 left = remainder;
142 remove += 1;
143 } else {
144 break;
145 }
146 }
147
148 *bufs = &mut std::mem::take(bufs)[remove..];
149 if let Some(first) = bufs.first_mut() {
150 let buf = &first[..left];
151 // necessary due to limitating in the borrow checker,
152 // when tokio MSRV reaches 1.81.0 this entire function
153 // can be replaced with `IoSlice::advance_slices`
154 //
155 // SAFETY: transmute a sub-slice of an IoSlice<'a> back to
156 // the lifetime `'a`. This is safe because the underlying memory
157 // is guaranteed to live for 'a, we have shared access, and no
158 // underlying data is reinterpreted to a different type.
159 unsafe {
160 *first = IoSlice::new(std::mem::transmute::<&[u8], &'a [u8]>(buf));
161 }
162 } else {
163 assert!(left == 0, "advancing io slices beyond their length");
164 }
165}