git @ Cat's Eye Technologies Kosheri / master src / stream.c
master

Tree @master (Download .tar.gz)

stream.c @masterraw · history · blame

/*
 * stream.c
 * Routines for communicating with stream-like processes.
 */

#include "lib.h"

#include "process.h"

#include "stream.h"

static void
read_receiver_run(struct process *self)
{
	struct value msg;
	const char *token;
	int length;
	int i;

	assert(self != NULL);
	assert(self->aux != NULL);

	while (process_dequeue(self, &msg)) {
		token = value_symbol_get_token(&msg);
		length = value_symbol_get_length(&msg);
		for (i = 0; i < length; i++) {
			((char *)self->aux)[i] = token[i];
		}
		/* XXX assert the queue is empty? */
	}
}

static struct process *
read_receiver_new(void *buffer)
{
	struct process *p;

	p = process_new();
	p->run = read_receiver_run;
	p->aux = buffer;

	return p;
}

static void
eof_receiver_run(struct process *self)
{
	struct value msg;

	assert(self != NULL);
	assert(self->aux != NULL);

	while (process_dequeue(self, &msg)) {
		*(int *)(self->aux) = value_get_boolean(&msg);
		/* XXX assert the queue is empty? */
	}
}

static struct process *
eof_receiver_new(int *result)
{
	struct process *p;

	p = process_new();
	p->run = eof_receiver_run;
	p->aux = result;

	return p;
}

void
stream_write(struct process *self, struct process *p, const void *data, unsigned int size)
{
	struct value msg, tag;

	self = self;

	value_symbol_new(&tag, "write", 5);
	value_tuple_new(&msg, &tag, 1);
	value_symbol_new(value_tuple_fetch(&msg, 0), data, size);
	process_enqueue(p, &msg);
	process_run(p);
}

void
stream_read(struct process *self, struct process *p, void *buffer, unsigned int size)
{
	struct value msg, tag;
	struct process *receiver;

	if (self == NULL) {
		receiver = read_receiver_new(buffer);
	} else {
		receiver = self;
	}

	value_symbol_new(&tag, "read", 4);
	value_tuple_new(&msg, &tag, 2);
	value_process_set(value_tuple_fetch(&msg, 0), receiver);
	value_integer_set(value_tuple_fetch(&msg, 1), size);
	process_enqueue(p, &msg);
	process_run(p);

	if (receiver == self) {
		return;
	}

	process_run(receiver);
	process_free(receiver);
}

int
stream_is_at_end(struct process *self, struct process *p)
{
	struct value msg, tag;
	struct process *receiver;
	int result;

	if (self == NULL) {
		receiver = eof_receiver_new(&result);
	} else {
		receiver = self;
	}

	value_symbol_new(&tag, "eof", 3);
	value_tuple_new(&msg, &tag, 1);
	value_process_set(value_tuple_fetch(&msg, 0), receiver);
	process_enqueue(p, &msg);
	process_run(p);

	if (receiver == self) {
		return 0;
	}

	process_run(receiver);
	process_free(receiver);

	return result;
}

void
stream_close(struct process *self, struct process *p)
{
	struct value msg, tag;

	self = self;

	value_symbol_new(&tag, "close", 5);
	value_tuple_new(&msg, &tag, 0);
	process_enqueue(p, &msg);
	process_run(p);
}