aboutsummaryrefslogtreecommitdiff
path: root/py/stream.c
diff options
context:
space:
mode:
authorPaul Sokolovsky <pfalcon@users.sourceforge.net>2016-05-18 02:40:03 +0300
committerPaul Sokolovsky <pfalcon@users.sourceforge.net>2016-05-18 02:41:45 +0300
commit7f7c84b10a5f3b2f783955b01f55e0f913ac459f (patch)
tree845d4d10cf60656ef8b3cce0993d673e51ceded4 /py/stream.c
parent92a342a0113292c93a04d04af106649bb3e51b79 (diff)
py/stream: Support both "exact size" and "one underlying call" operations.
Both read and write operations support variants where either a) a single call is made to the undelying stream implementation and returned buffer length may be less than requested, or b) calls are repeated until requested amount of data is collected, shorter amount is returned only in case of EOF or error. These operations are available from the level of C support functions to be used by other C modules to implementations of Python methods to be used in user-facing objects. The rationale of these changes is to allow to write concise and robust code to work with *blocking* streams of types prone to short reads, like serial interfaces and sockets. Particular object types may select "exact" vs "once" types of methods depending on their needs. E.g., for sockets, revc() and send() methods continue to be "once", while read() and write() thus converted to "exactly" versions. These changes don't affect non-blocking handling, e.g. trying "exact" method on the non-blocking socket will return as much data as available without blocking. No data available is continued to be signaled as None return value to read() and write(). From the point of view of CPython compatibility, this model is a cross between its io.RawIOBase and io.BufferedIOBase abstract classes. For blocking streams, it works as io.BufferedIOBase model (guaranteeing lack of short reads/writes), while for non-blocking - as io.RawIOBase, returning None in case of lack of data (instead of raising expensive exception, as required by io.BufferedIOBase). Such a cross-behavior should be optimal for MicroPython needs.
Diffstat (limited to 'py/stream.c')
-rw-r--r--py/stream.c103
1 files changed, 72 insertions, 31 deletions
diff --git a/py/stream.c b/py/stream.c
index a3df1b8fd..9b1d5fd2d 100644
--- a/py/stream.c
+++ b/py/stream.c
@@ -49,6 +49,48 @@ STATIC mp_obj_t stream_readall(mp_obj_t self_in);
#define STREAM_CONTENT_TYPE(stream) (((stream)->is_text) ? &mp_type_str : &mp_type_bytes)
+// Returns error condition in *errcode, if non-zero, return value is number of bytes written
+// before error condition occured. If *errcode == 0, returns total bytes written (which will
+// be equal to input size).
+mp_uint_t mp_stream_rw(mp_obj_t stream, void *buf_, mp_uint_t size, int *errcode, byte flags) {
+ byte *buf = buf_;
+ mp_obj_base_t* s = (mp_obj_base_t*)MP_OBJ_TO_PTR(stream);
+ typedef mp_uint_t (*io_func_t)(mp_obj_t obj, void *buf, mp_uint_t size, int *errcode);
+ io_func_t io_func;
+ if (flags & MP_STREAM_RW_WRITE) {
+ io_func = (io_func_t)s->type->stream_p->write;
+ } else {
+ io_func = s->type->stream_p->read;
+ }
+
+ *errcode = 0;
+ mp_uint_t done = 0;
+ while (size > 0) {
+ mp_uint_t out_sz = io_func(stream, buf, size, errcode);
+ // For read, out_sz == 0 means EOF. For write, it's unspecified
+ // what it means, but we don't make any progress, so returning
+ // is still the best option.
+ if (out_sz == 0) {
+ return done;
+ }
+ if (out_sz == MP_STREAM_ERROR) {
+ // If we read something before getting EAGAIN, don't leak it
+ if (mp_is_nonblocking_error(*errcode) && done != 0) {
+ *errcode = 0;
+ }
+ return done;
+ }
+ if (flags & MP_STREAM_RW_ONCE) {
+ return out_sz;
+ }
+
+ buf += out_sz;
+ size -= out_sz;
+ done += out_sz;
+ }
+ return done;
+}
+
const mp_stream_p_t *mp_get_stream_raise(mp_obj_t self_in, int flags) {
mp_obj_base_t *o = (mp_obj_base_t*)MP_OBJ_TO_PTR(self_in);
const mp_stream_p_t *stream_p = o->type->stream_p;
@@ -62,7 +104,7 @@ const mp_stream_p_t *mp_get_stream_raise(mp_obj_t self_in, int flags) {
return stream_p;
}
-STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
+STATIC mp_obj_t stream_read_generic(size_t n_args, const mp_obj_t *args, byte flags) {
const mp_stream_p_t *stream_p = mp_get_stream_raise(args[0], MP_STREAM_OP_READ);
// What to do if sz < -1? Python docs don't specify this case.
@@ -94,8 +136,8 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
nlr_raise(mp_obj_new_exception_msg_varg(&mp_type_MemoryError, "out of memory"));
}
int error;
- mp_uint_t out_sz = stream_p->read(args[0], p, more_bytes, &error);
- if (out_sz == MP_STREAM_ERROR) {
+ mp_uint_t out_sz = mp_stream_read_exactly(args[0], p, more_bytes, &error);
+ if (error != 0) {
vstr_cut_tail_bytes(&vstr, more_bytes);
if (mp_is_nonblocking_error(error)) {
// With non-blocking streams, we read as much as we can.
@@ -165,8 +207,8 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
vstr_t vstr;
vstr_init_len(&vstr, sz);
int error;
- mp_uint_t out_sz = stream_p->read(args[0], vstr.buf, sz, &error);
- if (out_sz == MP_STREAM_ERROR) {
+ mp_uint_t out_sz = mp_stream_rw(args[0], vstr.buf, sz, &error, flags);
+ if (error != 0) {
vstr_clear(&vstr);
if (mp_is_nonblocking_error(error)) {
// https://docs.python.org/3.4/library/io.html#io.RawIOBase.read
@@ -182,20 +224,27 @@ STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
return mp_obj_new_str_from_vstr(STREAM_CONTENT_TYPE(stream_p), &vstr);
}
}
+
+STATIC mp_obj_t stream_read(size_t n_args, const mp_obj_t *args) {
+ return stream_read_generic(n_args, args, MP_STREAM_RW_READ);
+}
MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(mp_stream_read_obj, 1, 2, stream_read);
-mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len) {
- const mp_stream_p_t *stream_p = mp_get_stream_raise(self_in, MP_STREAM_OP_WRITE);
+STATIC mp_obj_t stream_read1(size_t n_args, const mp_obj_t *args) {
+ return stream_read_generic(n_args, args, MP_STREAM_RW_READ | MP_STREAM_RW_ONCE);
+}
+MP_DEFINE_CONST_FUN_OBJ_VAR_BETWEEN(mp_stream_read1_obj, 1, 2, stream_read1);
+
+mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len, byte flags) {
+ mp_get_stream_raise(self_in, MP_STREAM_OP_WRITE);
int error;
- mp_uint_t out_sz = stream_p->write(self_in, buf, len, &error);
- if (out_sz == MP_STREAM_ERROR) {
+ mp_uint_t out_sz = mp_stream_rw(self_in, (void*)buf, len, &error, flags);
+ if (error != 0) {
if (mp_is_nonblocking_error(error)) {
// http://docs.python.org/3/library/io.html#io.RawIOBase.write
// "None is returned if the raw stream is set not to block and
// no single byte could be readily written to it."
- // This is for consistency with read() behavior, still weird,
- // see abobe.
return mp_const_none;
}
nlr_raise(mp_obj_new_exception_arg1(&mp_type_OSError, MP_OBJ_NEW_SMALL_INT(error)));
@@ -206,33 +255,25 @@ mp_obj_t mp_stream_write(mp_obj_t self_in, const void *buf, size_t len) {
// XXX hack
void mp_stream_write_adaptor(void *self, const char *buf, size_t len) {
- mp_stream_write(MP_OBJ_FROM_PTR(self), buf, len);
-}
-
-// Works only with blocking streams
-mp_uint_t mp_stream_writeall(mp_obj_t stream, const byte *buf, mp_uint_t size, int *errcode) {
- mp_obj_base_t* s = (mp_obj_base_t*)MP_OBJ_TO_PTR(stream);
- mp_uint_t org_size = size;
- while (size > 0) {
- mp_uint_t out_sz = s->type->stream_p->write(stream, buf, size, errcode);
- if (out_sz == MP_STREAM_ERROR) {
- return MP_STREAM_ERROR;
- }
- buf += out_sz;
- size -= out_sz;
- }
- return org_size;
+ mp_stream_write(MP_OBJ_FROM_PTR(self), buf, len, MP_STREAM_RW_WRITE);
}
STATIC mp_obj_t stream_write_method(mp_obj_t self_in, mp_obj_t arg) {
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(arg, &bufinfo, MP_BUFFER_READ);
- return mp_stream_write(self_in, bufinfo.buf, bufinfo.len);
+ return mp_stream_write(self_in, bufinfo.buf, bufinfo.len, MP_STREAM_RW_WRITE);
}
MP_DEFINE_CONST_FUN_OBJ_2(mp_stream_write_obj, stream_write_method);
+STATIC mp_obj_t stream_write1_method(mp_obj_t self_in, mp_obj_t arg) {
+ mp_buffer_info_t bufinfo;
+ mp_get_buffer_raise(arg, &bufinfo, MP_BUFFER_READ);
+ return mp_stream_write(self_in, bufinfo.buf, bufinfo.len, MP_STREAM_RW_WRITE | MP_STREAM_RW_ONCE);
+}
+MP_DEFINE_CONST_FUN_OBJ_2(mp_stream_write1_obj, stream_write1_method);
+
STATIC mp_obj_t stream_readinto(size_t n_args, const mp_obj_t *args) {
- const mp_stream_p_t *stream_p = mp_get_stream_raise(args[0], MP_STREAM_OP_READ);
+ mp_get_stream_raise(args[0], MP_STREAM_OP_READ);
mp_buffer_info_t bufinfo;
mp_get_buffer_raise(args[1], &bufinfo, MP_BUFFER_WRITE);
@@ -248,8 +289,8 @@ STATIC mp_obj_t stream_readinto(size_t n_args, const mp_obj_t *args) {
}
int error;
- mp_uint_t out_sz = stream_p->read(args[0], bufinfo.buf, len, &error);
- if (out_sz == MP_STREAM_ERROR) {
+ mp_uint_t out_sz = mp_stream_read_exactly(args[0], bufinfo.buf, len, &error);
+ if (error != 0) {
if (mp_is_nonblocking_error(error)) {
return mp_const_none;
}