@@ -873,17 +873,61 @@ int swFactoryProcess_notify(swFactory *factory, swDataHead *ev)
873
873
return factory -> dispatch (factory , (swDispatchData * ) & sw_notify_data );
874
874
}
875
875
876
+
877
+ static sw_inline uint32_t swServer_worker_schedule (swServer * serv , uint32_t schedule_key )
878
+ {
879
+ uint32_t target_worker_id = 0 ;
880
+
881
+ //polling mode
882
+ if (serv -> dispatch_mode == SW_DISPATCH_ROUND )
883
+ {
884
+ target_worker_id = (serv -> worker_round_id ++ ) % serv -> worker_num ;
885
+ }
886
+ //Using the FD touch access to hash
887
+ else if (serv -> dispatch_mode == SW_DISPATCH_FDMOD )
888
+ {
889
+ target_worker_id = schedule_key % serv -> worker_num ;
890
+ }
891
+ //Preemptive distribution
892
+ else
893
+ {
894
+ if (serv -> ipc_mode == SW_IPC_MSGQUEUE )
895
+ {
896
+ //msgsnd参数必须>0
897
+ //worker进程中正确的mtype应该是pti + 1
898
+ target_worker_id = serv -> worker_num ;
899
+ }
900
+ else
901
+ {
902
+ int i ;
903
+ sw_atomic_t * round = & SwooleTG .worker_round_i ;
904
+ for (i = 0 ; i < serv -> worker_num ; i ++ )
905
+ {
906
+ sw_atomic_fetch_add (round , 1 );
907
+ target_worker_id = (* round ) % serv -> worker_num ;
908
+
909
+ if (serv -> workers [target_worker_id ].status == SW_WORKER_IDLE )
910
+ {
911
+ break ;
912
+ }
913
+ }
914
+ swTrace ("schedule=%d|round=%d\n" , target_worker_id , * round );
915
+ }
916
+ }
917
+ return target_worker_id ;
918
+ }
919
+
876
920
/**
877
921
* [ReactorThread] dispatch request to worker
878
922
*/
879
923
int swFactoryProcess_dispatch (swFactory * factory , swDispatchData * task )
880
924
{
881
- int schedule_key ;
882
- int send_len = sizeof (task -> data .info ) + task -> data .info .len ;
883
- int target_worker_id = task -> target_worker_id ;
925
+ uint32_t schedule_key ;
926
+ uint32_t send_len = sizeof (task -> data .info ) + task -> data .info .len ;
927
+ uint16_t target_worker_id ;
884
928
swServer * serv = SwooleG .serv ;
885
929
886
- if (target_worker_id < 0 )
930
+ if (task -> target_worker_id < 0 )
887
931
{
888
932
//udp use remote port
889
933
if (task -> data .info .type == SW_EVENT_UDP || task -> data .info .type == SW_EVENT_UDP6
@@ -915,7 +959,20 @@ int swFactoryProcess_dispatch(swFactory *factory, swDispatchData *task)
915
959
target_worker_id = swServer_worker_schedule (serv , schedule_key );
916
960
}
917
961
}
918
- return swReactorThread_send2worker ((void * ) & (task -> data ), send_len , target_worker_id );
962
+ else
963
+ {
964
+ target_worker_id = task -> target_worker_id ;
965
+ }
966
+
967
+ if (SwooleTG .type == SW_THREAD_REACTOR )
968
+ {
969
+ return swReactorThread_send2worker ((void * ) & (task -> data ), send_len , target_worker_id );
970
+ }
971
+ else
972
+ {
973
+ swTrace ("dispatch to worker#%d" , target_worker_id );
974
+ return swServer_send2worker_blocking (serv , (void * ) & (task -> data ), send_len , target_worker_id );
975
+ }
919
976
}
920
977
921
978
static int swFactoryProcess_writer_start (swFactory * factory )
0 commit comments