rabbitmq erlang 源代码读 三 core process启动 gen_server

xiaoyatouvsanan 2010-01-19

fromhttp://blog.chinaunix.net/u3/103972/showart.php?id=2130456这里格式较好

我原来的博客zdx3578.cublog.cn

看erlang自己的源代码proc_lib.erl

proc_info(Pid,Item)whennode(Pid)=:=node()->

process_info(Pid,Item);

proc_info(Pid,Item)->

caselists:member(node(Pid),nodes())of

true->

check(rpc:call(node(Pid),erlang,process_info,[Pid,Item]));

_->

hidden

end.

erlang把底层远程调用都隐藏了,这就是“显意编程”(intentionalprogramming)吧http://erlang-china.org/uploads/2007/09/joes-thesis.zip的4.5

gen_server.erl源代码看看

gen_server.erl%%从粗体看下去再从斜体看上来

start(Mod,Args,Options)->

gen:start(?MODULE,nolink,Mod,Args,Options).

start_link(Mod,Args,Options)->

gen:start(?MODULE,link,Mod,Args,Options).

gen.erl

start(GenMod,LinkP,Mod,Args,Options)->

do_spawn(GenMod,LinkP,Mod,Args,Options).

do_spawn(GenMod,_,Mod,Args,Options)->

Time=timeout(Options),

proc_lib:start(?MODULE,init_it,[GenMod,self(),self,Mod,Args,Options],Time,spawn_opts(Options)).%%MFA

proc_lib.erl

-specstart(atom(),atom(),[term()],timeout(),[spawn_option()])->term().

start(M,F,A,Timeout,SpawnOpts)whenis_atom(M),is_atom(F),is_list(A)->

Pid=?MODULE:spawn_opt(M,F,A,SpawnOpts),

sync_wait(Pid,Timeout).

spawn_opt(M,F,A,Opts)whenis_atom(M),is_atom(F),is_list(A)->

Parent=get_my_name(),

Ancestors=get_ancestors(),

check_for_monitor(Opts),

erlang:spawn_opt(?MODULE,init_p,[Parent,Ancestors,M,F,A],Opts).

-specinit_p(pid(),[pid()],atom(),atom(),[term()])->term().

init_p(Parent,Ancestors,M,F,A)whenis_atom(M),is_atom(F),is_list(A)->

put('$ancestors',[Parent|Ancestors]),

put('$initial_call',trans_init(M,F,A)),

init_p_do_apply(M,F,A).

init_p_do_apply(M,F,A)->

try

apply(M,F,A)

catch

Class:Reason->

exit_p(Class,Reason)

end.

gen.erl的如下语句

proc_lib:start(?MODULE,init_it,[GenMod,self(),self,Mod,Args,Options],Time,spawn_opts(Options)).%%MFA就执行gen.erl的init_it

init_it(GenMod,Starter,Parent,Mod,Args,Options)->%%FA

init_it2(GenMod,Starter,Parent,self(),Mod,Args,Options).

init_it2(GenMod,Starter,Parent,Name,Mod,Args,Options)->

GenMod:init_it(Starter,Parent,Name,Mod,Args,Options).%%AGenMod就是gen_server,执行gen_server的init

gen_server.erl

init_it(Starter,self,Name,Mod,Args,Options)->

init_it(Starter,self(),Name,Mod,Args,Options);

init_it(Starter,Parent,Name0,Mod,Args,Options)->

Name=name(Name0),

Debug=debug_options(Name,Options),

casecatchMod:init(Args)of%%gen_server在调用Mod模块的init

{ok,State}->

proc_lib:init_ack(Starter,{ok,self()}),

loop(Parent,Name,State,Mod,infinity,Debug);%%开始循环

{ok,State,Timeout}->

proc_lib:init_ack(Starter,{ok,self()}),

loop(Parent,Name,State,Mod,Timeout,Debug);

。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

end.

%%%TheMAINloop.

%%%---------------------------------------------------

loop(Parent,Name,State,Mod,hibernate,Debug)->

proc_lib:hibernate(?MODULE,wake_hib,[Parent,Name,State,Mod,Debug]);

loop(Parent,Name,State,Mod,Time,Debug)->

Msg=receive

Input->

Input

afterTime->

timeout

end,

decode_msg(Msg,Parent,Name,State,Mod,Time,Debug,false).

decode_msg(Msg,Parent,Name,State,Mod,Time,Debug,Hib)->

caseMsgof

{system,From,Req}->

sys:handle_system_msg(Req,From,Parent,?MODULE,Debug,

[Name,State,Mod,Time],Hib);

{'EXIT',Parent,Reason}->

terminate(Reason,Name,Msg,Mod,State,Debug);

_MsgwhenDebug=:=[]->

handle_msg(Msg,Parent,Name,State,Mod);

_Msg->

Debug1=sys:handle_debug(Debug,{?MODULE,print_event},

Name,{in,Msg}),

handle_msg(Msg,Parent,Name,State,Mod,Debug1)

end.

%%%Messagehandlingfunctions

%%%---------------------------------------------------

dispatch({'$gen_cast',Msg},Mod,State)->

Mod:handle_cast(Msg,State);

dispatch(Info,Mod,State)->

Mod:handle_info(Info,State).

handle_msg({'$gen_call',From,Msg},Parent,Name,State,Mod)->

casecatchMod:handle_call(Msg,From,State)of%%回调handle_call

{reply,Reply,NState}->

reply(From,Reply),

loop(Parent,Name,NState,Mod,infinity,[]);%%继续循环

{reply,Reply,NState,Time1}->

reply(From,Reply),

loop(Parent,Name,NState,Mod,Time1,[]);

{noreply,NState}->

loop(Parent,Name,NState,Mod,infinity,[]);

{noreply,NState,Time1}->

loop(Parent,Name,NState,Mod,Time1,[]);

{stop,Reason,Reply,NState}->

{'EXIT',R}=

(catchterminate(Reason,Name,Msg,Mod,NState,[])),

reply(From,Reply),

exit(R);

Other->handle_common_reply(Other,Parent,Name,Msg,Mod,State)

end;

handle_msg(Msg,Parent,Name,State,Mod)->

Reply=(catchdispatch(Msg,Mod,State)),

handle_common_reply(Reply,Parent,Name,Msg,Mod,State).

——————————————————————————————————————————————————————————————

rabbit.erl

start(normal,[])->

{ok,SupPid}=rabbit_sup:start_link(),

rabbit_sup.erl

-define(SERVER,?MODULE).

start_link()->

supervisor:start_link({local,?SERVER},?MODULE,[]).%%注册supervisorname{local,rabbit_sup}

init([])->

{ok,{{one_for_one,10,10},[]}}.

rabbit.erl

{"coreprocesses",

fun()->

ok=start_child(rabbit_log),

ok=rabbit_hooks:start(),

ok=rabbit_binary_generator:

check_empty_content_body_frame_size(),

{ok,MemoryAlarms}=application:get_env(memory_alarms),

ok=rabbit_alarm:start(MemoryAlarms),

ok=rabbit_amqqueue:start(),

ok=start_child(rabbit_router),

ok=start_child(rabbit_node_monitor)

end},

start_child(Mod)->

{ok,_}=supervisor:start_child(rabbit_sup,%%这个名的注册就是在start/2启动时的rabbit_sup:start_link()启动的

{Mod,{Mod,start_link,[]},

transient,100,worker,[Mod]}),

ok.

supervisor.erl:

-behaviour(gen_server).

start_child(Supervisor,ChildSpec)->

call(Supervisor,{start_child,ChildSpec}).

call(Supervisor,Req)->

gen_server:call(Supervisor,Req,infinity).

gen_server.erl

call(Name,Request,Timeout)->

casecatchgen:call(Name,'$gen_call',Request,Timeout)of

{ok,Res}->

Res;

{'EXIT',Reason}->

exit({Reason,{?MODULE,call,[Name,Request,Timeout]}})

end.

gen.erl

call(Pid,Label,Request,Timeout)

whenis_pid(Pid),Timeout=:=infinity;

is_pid(Pid),is_integer(Timeout),Timeout>=0->

do_call(Pid,Label,Request,Timeout);

do_call(Process,Label,Request,Timeout)->

Node=caseProcessof

{_S,N}whenis_atom(N)->

N;

_whenis_pid(Process)->

node(Process)

end,

tryerlang:monitor(process,Process)of

Mref->

catcherlang:send(Process,{Label,{self(),Mref},Request},

[noconnect]),

wait_resp_mon(Node,Mref,Timeout)

catch

error:_->

monitor_node(Node,true),

receive

{nodedown,Node}->

monitor_node(Node,false),

exit({nodedown,Node})

after0->

Tag=make_ref(),

Process!{Label,{self(),Tag},Request},

wait_resp(Node,Tag,Timeout)

end

end.

wait_resp_mon(Node,Mref,Timeout)->

..............

wait_resp(Node,Tag,Timeout)->

..............

gen_server.erl

dispatch({'$gen_cast',Msg},Mod,State)->

Mod:handle_cast(Msg,State);

dispatch(Info,Mod,State)->

Mod:handle_info(Info,State).

handle_msg({'$gen_call',From,Msg},Parent,Name,State,Mod)->

casecatchMod:handle_call(Msg,From,State)of

{reply,Reply,NState}->

reply(From,Reply),

loop(Parent,Name,NState,Mod,infinity,[]);

{reply,Reply,NState,Time1}->

reply(From,Reply),

loop(Parent,Name,NState,Mod,Time1,[]);

{noreply,NState}->

loop(Parent,Name,NState,Mod,infinity,[]);

{noreply,NState,Time1}->

loop(Parent,Name,NState,Mod,Time1,[]);

{stop,Reason,Reply,NState}->

{'EXIT',R}=

(catchterminate(Reason,Name,Msg,Mod,NState,[])),

reply(From,Reply),

exit(R);

Other->handle_common_reply(Other,Parent,Name,Msg,Mod,State)

end;

handle_msg(Msg,Parent,Name,State,Mod)->

Reply=(catchdispatch(Msg,Mod,State)),

handle_common_reply(Reply,Parent,Name,Msg,Mod,State).

handle_common_reply(Reply,Parent,Name,Msg,Mod,State)->

caseReplyof

{noreply,NState}->

loop(Parent,Name,NState,Mod,infinity,[]);

{noreply,NState,Time1}->

loop(Parent,Name,NState,Mod,Time1,[]);

{stop,Reason,NState}->

terminate(Reason,Name,Msg,Mod,NState,[]);

{'EXIT',What}->

terminate(What,Name,Msg,Mod,State,[]);

_->

terminate({bad_return_value,Reply},Name,Msg,Mod,State,[])

end.

supervisor.erl:

handle_call({start_child,ChildSpec},_From,State)->

casecheck_childspec(ChildSpec)of

{ok,Child}->

{Resp,NState}=handle_start_child(Child,State),

{reply,Resp,NState};

What->

{reply,{error,What},State}

end;

相关推荐