Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add buffer stream #269

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
254 changes: 254 additions & 0 deletions src/tbox/stream/impl/stream/buffer.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
/*!The Treasure Box Library
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Copyright (C) 2009-present, TBOOX Open Source Group.
*
* @author A2va
* @file buffer.c
*
*/

/* //////////////////////////////////////////////////////////////////////////////////////
* includes
*/
#include "prefix.h"

/* //////////////////////////////////////////////////////////////////////////////////////
* types
*/

// the buffer stream type
typedef struct __tb_stream_buffer_t
{
// the buffer
tb_buffer_ref_t buffer;

// the head
tb_size_t head;

// the buffer is referenced?
tb_bool_t bref;

}tb_stream_buffer_t;

/* //////////////////////////////////////////////////////////////////////////////////////
* implementation
*/
static __tb_inline__ tb_stream_buffer_t* tb_stream_buffer_cast(tb_stream_ref_t stream)
{
// check
tb_assert_and_check_return_val(stream && tb_stream_type(stream) == TB_STREAM_TYPE_BUFF, tb_null);

// ok?
return (tb_stream_buffer_t*)stream;
}
static tb_bool_t tb_stream_buffer_open(tb_stream_ref_t stream)
{
// check
tb_stream_buffer_t* stream_buffer = tb_stream_buffer_cast(stream);
tb_assert_and_check_return_val(stream_buffer, tb_false);

stream_buffer->head = 0;

// ok
return tb_true;
}
static tb_bool_t tb_stream_buffer_clos(tb_stream_ref_t stream)
{
// check
tb_stream_buffer_t* stream_buffer = tb_stream_buffer_cast(stream);
tb_assert_and_check_return_val(stream_buffer, tb_false);

// clear head
stream_buffer->head = 0;

// ok
return tb_true;
}
static tb_void_t tb_stream_buffer_exit(tb_stream_ref_t stream)
{
// check
tb_stream_buffer_t* stream_buffer = tb_stream_buffer_cast(stream);
tb_assert_and_check_return(stream_buffer);

// clear head
stream_buffer->head = 0;

// exit buffer
if (stream_buffer->buffer && !stream_buffer->bref) tb_buffer_exit(stream_buffer->buffer);
stream_buffer->buffer = tb_null;
}
static tb_long_t tb_stream_buffer_read(tb_stream_ref_t stream, tb_byte_t* data, tb_size_t size)
{
// check
tb_stream_buffer_t* stream_buffer = tb_stream_buffer_cast(stream);
tb_assert_and_check_return_val(stream_buffer && stream_buffer->buffer, -1);

// check
tb_check_return_val(data, -1);
tb_check_return_val(size, 0);

// the left
tb_size_t left = tb_buffer_size(stream_buffer->buffer) - stream_buffer->head;

// the need
if (size > left) size = left;

// read data
if (size) tb_memcpy(data, tb_buffer_data(stream_buffer->buffer) + stream_buffer->head, size);

// save head
stream_buffer->head += size;

// ok?
return (tb_long_t)(size);
}
static tb_long_t tb_stream_buffer_writ(tb_stream_ref_t stream, tb_byte_t const* data, tb_size_t size)
{
// check
tb_stream_buffer_t* stream_buffer = tb_stream_buffer_cast(stream);
tb_assert_and_check_return_val(stream_buffer && stream_buffer->buffer, -1);

// check
tb_check_return_val(data, -1);
tb_check_return_val(size, 0);

// writ data
if (size) tb_buffer_memncpyp(stream_buffer->buffer, stream_buffer->head, data, size);
A2va marked this conversation as resolved.
Show resolved Hide resolved

// save head
stream_buffer->head += size;

// ok?
return size;
}
static tb_bool_t tb_stream_buffer_seek(tb_stream_ref_t stream, tb_hize_t offset)
{
// check
tb_stream_buffer_t* stream_buffer = tb_stream_buffer_cast(stream);
tb_assert_and_check_return_val(stream_buffer && offset <= tb_buffer_size(stream_buffer->buffer), tb_false);

// seek
stream_buffer->head = (tb_size_t)offset;

// ok
return tb_true;
}
static tb_long_t tb_stream_buffer_wait(tb_stream_ref_t stream, tb_size_t wait, tb_long_t timeout)
{
// check
tb_stream_buffer_t* stream_buffer = tb_stream_buffer_cast(stream);
tb_assert_and_check_return_val(stream_buffer, -1);

// wait
tb_long_t events = 0;
if (stream_buffer->head < tb_buffer_size(stream_buffer->buffer))
{
if (wait & TB_STREAM_WAIT_READ) events |= TB_STREAM_WAIT_READ;
if (wait & TB_STREAM_WAIT_WRIT) events |= TB_STREAM_WAIT_WRIT;
}
else events = -1;

// ok?
return events;
}
static tb_bool_t tb_stream_buffer_ctrl(tb_stream_ref_t stream, tb_size_t ctrl, tb_va_list_t args)
{
// check
tb_stream_buffer_t* stream_buffer = tb_stream_buffer_cast(stream);
tb_assert_and_check_return_val(stream_buffer, tb_false);

switch (ctrl)
{
case TB_STREAM_CTRL_GET_SIZE:
{
// the psize
tb_hong_t* psize = (tb_hong_t*)tb_va_arg(args, tb_hong_t*);
tb_assert_and_check_return_val(psize, tb_false);

// get size
*psize = tb_buffer_size(stream_buffer->buffer);
return tb_true;
}
break;
case TB_STREAM_CTRL_BUFF_SET_BUFFER:
{
// exit buffer first if exists
if (stream_buffer->buffer && !stream_buffer->bref) tb_buffer_exit(stream_buffer->buffer);

stream_buffer->buffer = (tb_buffer_ref_t)tb_va_arg(args, tb_buffer_ref_t);
stream_buffer->head = 0;
stream_buffer->bref = tb_false;
return tb_true;
}
break;
default:
break;
}
return tb_false;

}

/* //////////////////////////////////////////////////////////////////////////////////////
* interface implementation
*/
tb_stream_ref_t tb_stream_init_buffer()
{
return tb_stream_init( TB_STREAM_TYPE_BUFF
, sizeof(tb_stream_buffer_t)
, 0
, tb_stream_buffer_open
, tb_stream_buffer_clos
, tb_stream_buffer_exit
, tb_stream_buffer_ctrl
, tb_stream_buffer_wait
, tb_stream_buffer_read
, tb_stream_buffer_writ
, tb_stream_buffer_seek
, tb_null
, tb_null);
}
tb_stream_ref_t tb_stream_init_from_buffer(tb_buffer_ref_t buffer)
{
// check
tb_assert_and_check_return_val(buffer, tb_null);

// done
tb_bool_t ok = tb_false;
tb_stream_ref_t stream = tb_null;
do
{
// init stream
stream = tb_stream_init_buffer();
tb_assert_and_check_break(stream);

// set buffer and size
if (!tb_stream_ctrl(stream, TB_STREAM_CTRL_BUFF_SET_BUFFER, buffer)) break;

// ok
ok = tb_true;

} while (0);

// failed?
if (!ok)
{
// exit it
if (stream) tb_stream_exit(stream);
stream = tb_null;
}

// ok
return stream;
}
7 changes: 5 additions & 2 deletions src/tbox/stream/prefix.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ typedef enum __tb_stream_type_e
, TB_STREAM_TYPE_SOCK = 2
, TB_STREAM_TYPE_HTTP = 3
, TB_STREAM_TYPE_DATA = 4
, TB_STREAM_TYPE_FLTR = 5
, TB_STREAM_TYPE_USER = 6 ///!< for user defined stream type
, TB_STREAM_TYPE_BUFF = 5
, TB_STREAM_TYPE_FLTR = 6
, TB_STREAM_TYPE_USER = 7 ///!< for user defined stream type

}tb_stream_type_e;

Expand Down Expand Up @@ -151,6 +152,8 @@ typedef enum __tb_stream_ctrl_e
, TB_STREAM_CTRL_FLTR_SET_STREAM = TB_STREAM_CTRL(TB_STREAM_TYPE_FLTR, 3)
, TB_STREAM_CTRL_FLTR_SET_FILTER = TB_STREAM_CTRL(TB_STREAM_TYPE_FLTR, 4)

, TB_STREAM_CTRL_BUFF_SET_BUFFER = TB_STREAM_CTRL(TB_STREAM_TYPE_BUFF, 1)

}tb_stream_ctrl_e;

#endif
9 changes: 9 additions & 0 deletions src/tbox/stream/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,15 @@ tb_stream_ref_t tb_stream_init_filter_from_charset(tb_stream_ref_t strea
*/
tb_stream_ref_t tb_stream_init_filter_from_chunked(tb_stream_ref_t stream, tb_bool_t dechunked);

/*! init stream from buffer
*
* @param stream the stream
* @param buffer the buffer
*
* @return the stream
*/
tb_stream_ref_t tb_stream_init_from_buffer(tb_buffer_ref_t buffer);

/*! wait stream
*
* blocking wait the single event object, so need not aiop
Expand Down