本系列以Amoeba 997版本分支源码(目前最新版本分支)作为分析源,amoeba源码SVN地址:http://amoeba.googlecode.com/svn/trunk。在分析源码前,需要将源码导入到Eclipse工程里面。amoeba共分为amoeba、amoeba-mysql、amoeba-memcached、amoeba-aladdin、amoeba-mongodb、amoeba-manager、amoeba-geteway七个子工程。本系列主要分析amoeba、amoeba-mysql源码,其他子工程会根据需要分析说明
AmoebaProxyServer作为Amoeba的入口类存在于amoeba子工程的com.meidusa.amoeba.server包下。AmoebaProxyServer的main方法提供了完整的启动流程和方法。
AmoebaProxyServer提供了两个日志记录对象,一个用于基本的日志记录,一个用于对报告进行日志记录。
1 首先来看main方法的第一段代码(用于判断并处理启动或停止命令)。代码中加入少量注释,便于理解:
String level = System.getProperty("benchmark.level", "warn"); System.setProperty("benchmark.level", level); if(args.length>=1){ ShutdownClient client = new ShutdownClient(MonitorConstant.APPLICATION_NAME); MonitorCommandPacket packet = new MonitorCommandPacket(); //监控命令报文对象 //通过第一个命令行参数进行处理判断 if("start".equalsIgnoreCase(args[0])){ //处理启动amoeba命令 packet.funType = MonitorCommandPacket.FUN_TYPE_PING; if(client.run(packet)){ //如果已经启动,则提示amoeba server启动信息,并异常退出 System.out.println("amoeba server is running with port="+client.getPort()); System.exit(-1); } }else{ //处理停止amoeba命令 packet.funType = MonitorCommandPacket.FUN_TYPE_AMOEBA_SHUTDOWN; if(client.run(packet)){ //停止成功提示 System.out.println("amoeba server shutting down with port="+client.getPort()); }else{ //停止失败提示(原因为amoeba server并未启动) System.out.println("amoeba server not running with port="+client.getPort()); } System.exit(0); } }else{ //没有传入参数,则提示amoeba命令行用法 System.out.println("amoeba start|stop"); System.exit(0); }
以上代码中处理命令的功能主要通过ShutdownClient.run()方法来执行处理。作者在方法注释里提示“return false if server not running”,不管命令是start或者stop均可利用该含义返回值进行统一处理。如果为start命令,返回false表示之前未启动,返回true表示amoeba已经启动过;如果为stop命令,返回false表示amoeba并未启动,无法停止,返回true则表示停止成功。ShutsownClient.run()方法完整代码(加注释)如下所示:
/** * 第一次启动时,由于监控服务并未启动,程序将在socket建立时抛出异常返回false * @param command * @return false if server not running */ public boolean run(MonitorCommandPacket command) { if(port <=0){ //加载.Amoeba.shutdown.port(文件内容形如127.0.0.1:22334),从文件中获取监控IP和端口 socketInfoFile = new File(ConfigUtil.filter("${amoeba.home}"),appplicationName+".shutdown.port"); if(!socketInfoFile.exists()){ return false; } try { BufferedReader reader = new BufferedReader(new FileReader(socketInfoFile)); String sport = reader.readLine(); String tmp[] = StringUtil.split(sport, ":"); if(tmp.length <=1){ return false; } this.port = Integer.parseInt(tmp[1]); this.host = tmp[0]; reader.close(); }catch (Exception e) { e.printStackTrace(); return false; } } try { //访问监听端口。如果监控服务已经启动,则socket会成功建立,否则建立失败,返回false。 //amoeba在初次启动时,监控服务并未启动,流程进入异常,返回false Socket socket = null; try{ if(host == null){ //无配置,则读取localhost地址 socket = new Socket(InetAddress.getLocalHost(),port); }else{ if("0.0.0.0".equals(host)){ //配置为全零网络(默认网络),则读取localhost地址 socket = new Socket(InetAddress.getLocalHost(),port); }else{ socket = new Socket(host, port); } } }catch(IOException e){ //第一次启动,将进入异常,返回false return false; } socket.getOutputStream().write(command.toByteBuffer(null).array()); socket.getOutputStream().flush(); PacketInputStream pis = new MonitorPacketInputStream(); byte[] message = pis.readPacket(socket.getInputStream()); MonitorCommandPacket response = new MonitorCommandPacket(); response.init(message, null); if(response.funType == MonitorConstant.FUN_TYPE_OK){ System.out.println("remote application= "+ appplicationName+":"+port+" response OK"); } socket.close(); return true; } catch (Exception e) { e.printStackTrace(); return false; } }
以上代码中:ConfigUtil.filter("${amoeba.home}") 指加载环境变量中key=amoeba.home的值。filter过滤掉${}后获取系统环境变量
2 log4j配置加载,主要对配置XML加载并解析处理。通过log4j的FileWatchdog实现配置动态更新加载。
String log4jConf = System.getProperty("log4j.conf","${amoeba.home}/conf/log4j.xml"); log4jConf = ConfigUtil.filter(log4jConf); File logconf = new File(log4jConf); if(logconf.exists() && logconf.isFile()){ DOMConfigurator.configureAndWatch(logconf.getAbsolutePath(), System.getProperties()); }
3 amoeba配置加载,用于加载amoeba.xml的系统配置
final Logger logger = Logger.getLogger(AmoebaProxyServer.class); String config = System.getProperty("amoeba.conf","${amoeba.home}/conf/amoeba.xml"); String contextClass = System.getProperty("amoeba.context.class",ProxyRuntimeContext.class.getName()); if(contextClass != null){ //生成运行上下文环境对象 ProxyRuntimeContext context = (ProxyRuntimeContext)Class.forName(contextClass).newInstance(); ProxyRuntimeContext.setInstance(context); } config = ConfigUtil.filter(config); File configFile = new File(config); if(config == null || !configFile.exists()){ logger.error("could not find config file:"+configFile.getAbsolutePath()); System.exit(-1); }else{ //通过配置文件初始化上下文环境对象 ProxyRuntimeContext.getInstance().init(configFile.getAbsolutePath()); } //注册到报告列表中 registerReporter(ProxyRuntimeContext.getInstance()); for(ConnectionManager connMgr :ProxyRuntimeContext.getInstance().getConnectionManagerList().values()){ registerReporter(connMgr); }
4 启动每一个注入的服务
Map<String,Object> context = new HashMap<String,Object>(); context.putAll(ProxyRuntimeContext.getInstance().getConnectionManagerList()); List<BeanObjectEntityConfig> serviceConfigList = ProxyRuntimeContext.getInstance().getConfig().getServiceConfigList(); //初始化注入的每一个服务,并启动 //amoeba在启动时会启动监控服务和接收服务。用于mysql代理的需要与amoeba_mysql一起使用,因为注入的将会是amoeba_mysql中的服务器 for(BeanObjectEntityConfig serverConfig : serviceConfigList){ Service service = (Service)serverConfig.createBeanObject(false,context); service.init(); service.start(); //将每一个服务添加到支持优先级处理的释放钩子中 PriorityShutdownHook.addShutdowner(service); registerReporter(service); }
5 最后是开启report监控线程,一分钟执行一次,如果对这块要求不高,可以将时长修改长一点,如10分钟。