Skip to content

Commit 1b82a45

Browse files
committed
Fixed for ringbuffer
1 parent 5c8c8a5 commit 1b82a45

File tree

10 files changed

+249
-275
lines changed

10 files changed

+249
-275
lines changed

include/Connection.h

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,9 @@ int swConnection_send_blocking(int fd, void *data, int length, int timeout);
9393
int swConnection_buffer_send(swConnection *conn);
9494

9595
swString* swConnection_get_string_buffer(swConnection *conn);
96-
int swConnection_send_string_buffer(swConnection *conn);
9796
void swConnection_clear_string_buffer(swConnection *conn);
9897
volatile swBuffer_trunk* swConnection_get_out_buffer(swConnection *conn, uint32_t type);
9998
volatile swBuffer_trunk* swConnection_get_in_buffer(swConnection *conn);
100-
int swConnection_send_in_buffer(swConnection *conn);
10199
int swConnection_sendfile(swConnection *conn, char *filename);
102100

103101
#ifdef SW_USE_OPENSSL

include/Server.h

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ typedef struct _swReactorThread
112112
swReactor reactor;
113113
swUdpFd *udp_addrs;
114114
swCloseQueue close_queue;
115+
swMemoryPool *buffer_input;
115116
int c_udp_fd;
116117
} swReactorThread;
117118

@@ -488,6 +489,8 @@ void swServer_connection_close(swServer *serv, int fd, int notify);
488489
#define swServer_set_minfd(serv,maxfd) (serv->connection_list[SW_SERVER_MIN_FD_INDEX].fd=maxfd)
489490
#define swServer_get_minfd(serv) (serv->connection_list[SW_SERVER_MIN_FD_INDEX].fd)
490491

492+
#define swServer_get_thread(serv, reactor_id) (&(serv->reactor_threads[reactor_id]))
493+
491494
static sw_inline swWorker* swServer_get_worker(swServer *serv, uint16_t worker_id)
492495
{
493496
if (worker_id > serv->worker_num + SwooleG.task_worker_num)
@@ -580,28 +583,6 @@ void swWorker_free(swWorker *worker);
580583
void swWorker_signal_init(void);
581584
void swWorker_signal_handler(int signo);
582585

583-
static sw_inline void* swWorker_input_alloc(swWorker *worker, uint32_t size)
584-
{
585-
void *ptr = NULL;
586-
int try_count = 0;
587-
588-
while (1)
589-
{
590-
ptr = worker->pool_input->alloc(worker->pool_input, size);
591-
if (ptr == NULL)
592-
{
593-
if (try_count > SW_RINGBUFFER_WARNING)
594-
{
595-
swWarn("Input memory pool is full. Wait memory collect. alloc(%d)", size);
596-
}
597-
swYield();
598-
continue;
599-
}
600-
break;
601-
}
602-
return ptr;
603-
}
604-
605586
int swServer_master_onAccept(swReactor *reactor, swDataHead *event);
606587
void swServer_master_onReactorTimeout(swReactor *reactor);
607588
void swServer_master_onReactorFinish(swReactor *reactor);

include/swoole.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -725,7 +725,6 @@ struct _swWorker
725725

726726
swProcessPool *pool;
727727

728-
swMemoryPool *pool_input;
729728
swMemoryPool *pool_output;
730729

731730
/**

src/core/log.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ int swLog_init(char *logfile)
2525
SwooleG.log_fd = open(logfile, O_APPEND| O_RDWR | O_CREAT, 0666);
2626
if (SwooleG.log_fd < 0)
2727
{
28-
swWarn("open() log file[%s] failed. Error: %s[%d]", logfile, strerror(errno), errno);
28+
printf("open(%s) failed. Error: %s[%d]", logfile, strerror(errno), errno);
2929
return SW_ERR;
3030
}
3131
return SW_OK;
@@ -77,4 +77,3 @@ void swLog_put(int level, char *cnt)
7777
//write to log failed.
7878
}
7979
}
80-

src/factory/FactoryProcess.c

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -126,15 +126,6 @@ int swFactoryProcess_start(swFactory *factory)
126126
{
127127
return SW_ERR;
128128
}
129-
130-
#ifdef SW_USE_RINGBUFFER
131-
worker->pool_input = swRingBuffer_new(SwooleG.serv->buffer_input_size, 1);
132-
if (!worker->pool_input)
133-
{
134-
return SW_ERR;
135-
}
136-
#endif
137-
138129
}
139130

140131
//必须先启动manager进程组,否则会带线程fork

src/network/Connection.c

Lines changed: 0 additions & 174 deletions
Original file line numberDiff line numberDiff line change
@@ -156,91 +156,6 @@ int swConnection_sendfile(swConnection *conn, char *filename)
156156
return SW_OK;
157157
}
158158

159-
int swConnection_send_string_buffer(swConnection *conn)
160-
{
161-
int ret;
162-
swString *buffer = conn->object;
163-
swFactory *factory = SwooleG.factory;
164-
swDispatchData task;
165-
166-
task.data.info.fd = conn->fd;
167-
task.data.info.from_id = conn->from_id;
168-
169-
#ifdef SW_USE_RINGBUFFER
170-
171-
swServer *serv = SwooleG.serv;
172-
int target_worker_id = swServer_worker_schedule(serv, conn->fd);
173-
swWorker *worker = swServer_get_worker(serv, target_worker_id);
174-
swMemoryPool *pool = worker->pool_input;
175-
swPackage package;
176-
177-
package.length = buffer->length;
178-
while (1)
179-
{
180-
package.data = pool->alloc(pool, buffer->length);
181-
if (package.data == NULL)
182-
{
183-
swYield();
184-
swWarn("reactor memory pool full.");
185-
continue;
186-
}
187-
break;
188-
}
189-
task.data.info.type = SW_EVENT_PACKAGE;
190-
task.data.info.len = sizeof(package);
191-
task.target_worker_id = target_worker_id;
192-
//swoole_dump_bin(package.data, 's', buffer->length);
193-
memcpy(package.data, buffer->str, buffer->length);
194-
memcpy(task.data.data, &package, sizeof(package));
195-
ret = factory->dispatch(factory, &task);
196-
197-
#else
198-
int send_n = buffer->length;
199-
task.data.info.type = SW_EVENT_PACKAGE_START;
200-
task.target_worker_id = -1;
201-
202-
/**
203-
* lock target
204-
*/
205-
SwooleTG.factory_lock_target = 1;
206-
207-
void *send_ptr = buffer->str;
208-
do
209-
{
210-
if (send_n > SW_BUFFER_SIZE)
211-
{
212-
task.data.info.len = SW_BUFFER_SIZE;
213-
memcpy(task.data.data, send_ptr, SW_BUFFER_SIZE);
214-
}
215-
else
216-
{
217-
task.data.info.type = SW_EVENT_PACKAGE_END;
218-
task.data.info.len = send_n;
219-
memcpy(task.data.data, send_ptr, send_n);
220-
}
221-
222-
swTrace("dispatch, type=%d|len=%d\n", _send.info.type, _send.info.len);
223-
224-
ret = factory->dispatch(factory, &task);
225-
//TODO: 处理数据失败,数据将丢失
226-
if (ret < 0)
227-
{
228-
swWarn("factory->dispatch failed.");
229-
}
230-
send_n -= task.data.info.len;
231-
send_ptr += task.data.info.len;
232-
}
233-
while (send_n > 0);
234-
235-
/**
236-
* unlock
237-
*/
238-
SwooleTG.factory_target_worker = -1;
239-
SwooleTG.factory_lock_target = 0;
240-
241-
#endif
242-
return ret;
243-
}
244159

245160
void swConnection_clear_string_buffer(swConnection *conn)
246161
{
@@ -252,95 +167,6 @@ void swConnection_clear_string_buffer(swConnection *conn)
252167
}
253168
}
254169

255-
int swConnection_send_in_buffer(swConnection *conn)
256-
{
257-
swDispatchData task;
258-
swFactory *factory = SwooleG.factory;
259-
260-
task.data.info.fd = conn->fd;
261-
task.data.info.from_id = conn->from_id;
262-
263-
swBuffer *buffer = conn->in_buffer;
264-
swBuffer_trunk *trunk = swBuffer_get_trunk(buffer);
265-
266-
#ifdef SW_USE_RINGBUFFER
267-
268-
swServer *serv = SwooleG.serv;
269-
uint16_t target_worker_id = swServer_worker_schedule(serv, conn->fd);
270-
swWorker *worker = swServer_get_worker(serv, target_worker_id);
271-
swMemoryPool *pool = worker->pool_input;
272-
swPackage package;
273-
274-
package.length = 0;
275-
while (1)
276-
{
277-
package.data = pool->alloc(pool, buffer->length);
278-
if (package.data == NULL)
279-
{
280-
swYield();
281-
swWarn("reactor memory pool full.");
282-
continue;
283-
}
284-
break;
285-
}
286-
task.data.info.type = SW_EVENT_PACKAGE;
287-
288-
while (trunk != NULL)
289-
{
290-
task.data.info.len = trunk->length;
291-
memcpy(package.data + package.length, trunk->store.ptr, trunk->length);
292-
package.length += trunk->length;
293-
294-
swBuffer_pop_trunk(buffer, trunk);
295-
trunk = swBuffer_get_trunk(buffer);
296-
}
297-
task.data.info.len = sizeof(package);
298-
task.target_worker_id = target_worker_id;
299-
memcpy(task.data.data, &package, sizeof(package));
300-
//swWarn("[ReactorThread] copy_n=%d", package.length);
301-
return factory->dispatch(factory, &task);
302-
303-
#else
304-
305-
int ret;
306-
task.data.info.type = SW_EVENT_PACKAGE_START;
307-
task.target_worker_id = -1;
308-
309-
/**
310-
* lock target
311-
*/
312-
SwooleTG.factory_lock_target = 1;
313-
314-
while (trunk != NULL)
315-
{
316-
task.data.info.len = trunk->length;
317-
memcpy(task.data.data, trunk->store.ptr, task.data.info.len);
318-
//package end
319-
if (trunk->next == NULL)
320-
{
321-
task.data.info.type = SW_EVENT_PACKAGE_END;
322-
}
323-
ret = factory->dispatch(factory, &task);
324-
//TODO: 处理数据失败,数据将丢失
325-
if (ret < 0)
326-
{
327-
swWarn("factory->dispatch() failed.");
328-
}
329-
swBuffer_pop_trunk(buffer, trunk);
330-
trunk = swBuffer_get_trunk(buffer);
331-
332-
swTrace("send2worker[trunk_num=%d][type=%d]\n", buffer->trunk_num, _send.info.type);
333-
}
334-
/**
335-
* unlock
336-
*/
337-
SwooleTG.factory_target_worker = -1;
338-
SwooleTG.factory_lock_target = 0;
339-
340-
#endif
341-
return SW_OK;
342-
}
343-
344170
volatile swBuffer_trunk* swConnection_get_in_buffer(swConnection *conn)
345171
{
346172
volatile swBuffer_trunk *trunk = NULL;

0 commit comments

Comments
 (0)