Since pipes and sockets can be used by the ioregionfd wire protocol I decided to write a few words about how these IPC mechanisms are implemented in linux kernel. I haven’t had an opportunity to dig into pipe internals so I was wondering how pipe buffering works.

A pipe is a unidirectional data flow. A new pipe can be created with pipe() system call which returns a pair of file descriptors. The first descriptor connects to the read end of the pipe, the second connects to the write end. Data written to fd[1] can be read from fd[0]. Since wire protocol is a bi-directional two different pipes must be used. The pipes are implemented in fs/pipe.c. The kernel invokes do_pipe2() in order to create a new pipe. Here is the code:

static int do_pipe2(int __user *fildes, int flags)
{
	struct file *files[2];
	int fd[2];
	int error;

	error = __do_pipe_flags(fd, files, flags);
	if (!error) {
		if (unlikely(copy_to_user(fildes, fd, sizeof(fd)))) {
			...
		} else {
			fd_install(fd[0], files[0]);
			fd_install(fd[1], files[1]);
		}
	}
	return error;
}

__do_pipe_flags() calls create_pipe_files() function which allocates an inode, a pipe and two file objects (one for reading and the other for writing). Besides that each pipe makes use of 16 pipe buffers. A pipe buffer is a page frame. It can contain data written into the pipe and available for readers. These buffers can be seen as a one circular buffer: writing processes keep adding data to this buffer, while reading process keep removing them. First create_pipe_files() calls get_pipe_inode() function which in turn invokes new_inode_pseudo() and alloc_pipe_info() in order to create a new inode and a pipe objects. The alloc_pipe_info() function allocates a new pipe and pipe buffers. The pipe-max-size default value is 1048576 (1 MiB). It can be set by root in /proc/sys/fs/pipe-max-size.

#define PIPE_DEF_BUFFERS	16

struct pipe_inode_info *alloc_pipe_info(void)
{
	struct pipe_inode_info *pipe;
	unsigned long pipe_bufs = PIPE_DEF_BUFFERS;
	...
	unsigned int max_size = READ_ONCE(pipe_max_size);

	pipe = kzalloc(sizeof(struct pipe_inode_info), GFP_KERNEL_ACCOUNT);
	if (pipe == NULL)
		goto out_free_uid;

	if (pipe_bufs * PAGE_SIZE > max_size && !capable(CAP_SYS_RESOURCE))
		pipe_bufs = max_size >> PAGE_SHIFT;
	...
	pipe->bufs = kcalloc(pipe_bufs, sizeof(struct pipe_buffer),
			     GFP_KERNEL_ACCOUNT);

	if (pipe->bufs) {
		init_waitqueue_head(&pipe->rd_wait);
		init_waitqueue_head(&pipe->wr_wait);
		pipe->r_counter = pipe->w_counter = 1;
		pipe->max_usage = pipe_bufs;
		pipe->ring_size = pipe_bufs;
		pipe->nr_accounted = pipe_bufs;
		pipe->user = user;
		mutex_init(&pipe->mutex);
		return pipe;
	}
	...
}

After that create_pipe_files() allocates two file objects for a pipe and sets their f_op to pipefifo_fops. So if userspace process performs write() system call specifying the file descriptor for the writing end of the pipe the kernel satisfies this request by invoking the pipe_write().

const struct file_operations pipefifo_fops = {
	.open		= fifo_open,
	.llseek		= no_llseek,
	.read_iter	= pipe_read,
	.write_iter	= pipe_write,
...

Write operations involving a small number of bytes are atomically executed. If two or more processes are concurrently writing into a pipe and each write operation involving fewer than PAGE_SIZE should finish without being interleaved with write operations of other processes to the same pipe. However, write operations involving more than PAGE_SIZE may be nonatomic and may also force the calling process to sleep.

static ssize_t
pipe_write(struct kiocb *iocb, struct iov_iter *from)
{
	struct file *filp = iocb->ki_filp;
	struct pipe_inode_info *pipe = filp->private_data;
	...
	size_t total_len = iov_iter_count(from);
	...
	__pipe_lock(pipe);
	...
	for (;;) {
		...
		head = pipe->head;
		if (!pipe_full(head, pipe->tail, pipe->max_usage)) {
			unsigned int mask = pipe->ring_size - 1;
			struct pipe_buffer *buf = &pipe->bufs[head & mask];
			struct page *page = pipe->tmp_page;
			int copied;

			if (!page) {
				page = alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT);
				if (unlikely(!page)) {
					ret = ret ? : -ENOMEM;
					break;
				}
				pipe->tmp_page = page;
			}

			spin_lock_irq(&pipe->rd_wait.lock);

			head = pipe->head;
			if (pipe_full(head, pipe->tail, pipe->max_usage)) {
				spin_unlock_irq(&pipe->rd_wait.lock);
				continue;
			}

			pipe->head = head + 1;
			spin_unlock_irq(&pipe->rd_wait.lock);

			/* Insert it into the buffer array */
			buf = &pipe->bufs[head & mask];
			buf->page = page;
			buf->ops = &anon_pipe_buf_ops;
			buf->offset = 0;
			buf->len = 0;
			...
			pipe->tmp_page = NULL;

			copied = copy_page_from_iter(page, 0, PAGE_SIZE, from);
			...
			ret += copied;
			buf->offset = 0;
			buf->len = copied;

			if (!iov_iter_count(from))
				break;
		}

		if (!pipe_full(head, pipe->tail, pipe->max_usage))
			continue;

		/* Wait for buffer space to become available. */
		...
		__pipe_unlock(pipe);
		if (was_empty) {
			wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM);
			kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
		}
		wait_event_interruptible_exclusive(pipe->wr_wait, pipe_writable(pipe));
		__pipe_lock(pipe);
		...
	}
	...
}

If the first !pipe_full() condition is satisfied and there is no cached page a new page frame will be allocated and attached to a cached page. Then this page can be also attached to a free pipe buffer slot with holding read wait-queue spinlock. After that copy_page_from_iter() copies data into the page frame and the pipe buffer offset and length fields are updated. If not all requested bytes were written and pipe circular buffer has some free slots pipe_write() function jumps to a next loop iteration. Here write operations can become nonatomic. If pipe is full the calling process waits for buffer space to become available.