涓轰粈涔堥€夋嫨RocketMQ
鎴戜滑鏉ョ湅鐪嬪畼鏂瑰洖绛旓細
鈥滄垜浠爺绌跺彂鐜帮紝瀵逛簬ActiveMQ鑰岃█锛岄殢鐫€瓒婃潵瓒婂鐨勪娇鐢╭ueues鍜宼opics锛屽叾IO鎴愪负浜嗙摱棰堛€傛煇浜涙儏鍐典笅锛屾秷璐硅€呯紦鎱紙娑堣垂鑳藉姏涓嶈冻锛夎繕浼氭嫋鎱㈢敓浜ц€咃紙閫犳垚娑堟伅闃诲锛夈€傝櫧鐒舵垜浠仛浜嗘渶澶у姫鍔涜繘琛屼紭鍖栵細鑺傛祦銆佹柇璺櫒鎴栬€呭洖閫€锛屼絾鏄苟涓嶈兘杩涜浼橀泤鐨勬墿灞曘€傚洜姝ゆ垜浠紑濮嬩笓娉ㄤ簬浣跨敤鏃朵笅闈炲父娴佽鐨刱afka锛屼絾鏄粛鐒朵笉鑳芥弧瓒虫垜浠殑瑕佹眰锛屽浣庡欢杩熷拰楂樺彲闈犳€э紝璇︽儏瑙?a style="color: #336699;" target="_blank" href="http://rocketmq.incubator.apache.org/rocketmq/how-to-support-more-queues-in-rocketmq/">杩欓噷銆傚湪杩欐牱鐨勮儗鏅笅锛屾垜浠喅瀹氬紑鍙戜竴涓柊鐨勬秷鎭腑闂翠欢鏉ュ鐞嗕竴绯诲垪骞挎硾鐨勪娇鐢ㄥ満鏅紝鍖呮嫭浠庝紶缁熺殑鍙戝竷/璁㈤槄鍦烘櫙鍒伴珮瀹归噺鐨勫疄鏃朵氦鏄撶郴缁熶腑涓嶅厑璁告秷鎭涪澶辩殑鍦烘櫙銆傗€?/p>
鍚勪綅鐪嬪畼涔熷彲浠ユ挳杩欓噷鍘荤湅鐪?a style="color: #336699;" target="_blank" href="http://rocketmq.incubator.apache.org/docs/motivation/">RocketMQ涓嶢ctiveMQ浠ュ強Kafka鐨勬瘮杈?/a>銆?/p>
聽
鏍稿績姒傚康
- 鐢熶骇鑰咃紙Producer锛夛細娑堟伅鍙戦€佹柟锛屽皢涓氬姟绯荤粺涓骇鐢熺殑娑堟伅鍙戦€佸埌brokers锛坆rokers鍙互鐞嗚В涓烘秷鎭唬鐞嗭紝鐢熶骇鑰呭拰娑堣垂鑰呬箣闂存槸閫氳繃brokers杩涜娑堟伅鐨勯€氫俊锛夛紝rocketmq鎻愪緵浜嗕互涓嬫秷鎭彂閫佹柟寮?span style="white-space: pre;">锛氬悓姝ャ€佸紓姝ャ€佸崟鍚?/span>銆?/li>
- 鐢熶骇鑰呯粍锛圥roducer Group锛夛細鐩稿悓瑙掕壊鐨勭敓浜ц€呰褰掍负鍚屼竴缁勶紝姣斿閫氬父鎯呭喌涓嬩竴涓湇鍔′細閮ㄧ讲澶氫釜瀹炰緥锛岃繖澶氫釜瀹炰緥灏辨槸涓€涓粍锛岀敓浜ц€呭垎缁勭殑浣滅敤鍙綋鐜板湪娑堟伅鍥炴煡鐨勬椂鍊欙紝鍗冲鏋滀竴涓敓浜ц€呯粍涓殑涓€涓敓浜ц€呭疄渚嬪彂閫佷竴涓簨鍔℃秷鎭埌broker鍚庢寕鎺変簡锛岄偅涔坆roker浼氬洖鏌ユ瀹炰緥鎵€鍦ㄧ粍鐨勫叾浠栧疄渚嬶紝浠庤€岃繘琛屾秷鎭殑鎻愪氦鎴栧洖婊氭搷浣溿€?/li>
- 娑堣垂鑰咃紙Consumer锛夛細娑堟伅娑堣垂鏂癸紝浠巄rokers鎷夊彇娑堟伅銆傜珯鍦ㄧ敤鎴风殑瑙掑害锛屾湁浠ヤ笅涓ょ娑堣垂鑰呫€?/li>
- 涓诲姩娑堣垂鑰咃紙PullConsumer锛夛細浠巄rokers鎷夊彇娑堟伅骞舵秷璐广€?/li>
- 琚姩娑堣垂鑰咃紙PushConsumer锛夛細鍐呴儴涔熸槸閫氳繃pull鏂瑰紡鑾峰彇娑堟伅锛屽彧鏄繘琛屼簡鎵╁睍鍜屽皝瑁咃紝骞剁粰鐢ㄦ埛棰勭暀浜嗕竴涓洖璋冩帴鍙e幓瀹炵幇锛屽綋娑堟伅鍒板簳鐨勬椂鍊欎細鎵ц鐢ㄦ埛鑷畾涔夌殑鍥炶皟鎺ュ彛銆?/li>
- 娑堣垂鑰呯粍锛圕onsumer Group锛夛細鍜岀敓浜ц€呯粍绫讳技銆傚叾浣滅敤浣撶幇鍦ㄥ疄鐜版秷璐硅€呯殑璐熻浇鍧囪 鍜屽閿欙紝鏈変簡娑堣垂鑰呯粍鍙樺緱寮傚父瀹规槗銆傞渶瑕佹敞鎰忕殑鏄細鍚屼竴涓秷璐硅€呯粍鐨勬瘡涓秷璐硅€呭疄渚嬭闃呯殑涓婚蹇呴』鐩稿悓銆?/li>
- 涓婚锛圱opic锛夛細涓婚灏辨槸娑堟伅浼犻€掔殑绫诲瀷銆備竴涓敓浜ц€呭疄渚嬪彲浠ュ彂閫佹秷鎭埌澶氫釜涓婚锛屽涓敓浜ц€呭疄渚嬩篃鍙互鍙戦€佹秷鎭埌鍚屼竴涓富棰樸€傚悓鏍风殑锛屽浜庢秷璐硅€呯鏉ヨ锛屼竴涓秷璐硅€呯粍鍙互璁㈤槄澶氫釜涓婚鐨勬秷鎭紝涓€涓富棰樼殑娑堟伅涔熷彲浠ヨ澶氫釜娑堣垂鑰呯粍璁㈤槄銆?/li>
- 娑堟伅锛圡essage锛夛細娑堟伅灏卞儚鏄綘浼犻€掍俊鎭殑淇″皝銆傛瘡涓秷鎭繀椤绘寚瀹氫竴涓富棰橈紝灏卞ソ姣旀瘡涓俊灏佷笂閮藉繀椤诲啓鏄庢敹浠朵汉銆?/li>
- 娑堟伅闃熷垪锛圡essage Queues锛夛細鍦ㄤ富棰樺唴閮紝閫昏緫鍒掑垎浜嗗涓瓙涓婚锛屾瘡涓瓙涓婚琚О涓烘秷鎭槦鍒椼€傝繖涓蹇靛湪瀹炵幇鏈€澶у苟鍙戞暟銆佹晠闅滃垏鎹㈢瓑鍔熻兘涓婃湁宸ㄥぇ鐨勪綔鐢ㄣ€?/li>
- 鏍囩锛圱ag锛夛細鏍囩锛屽彲浠ヨ璁や负鏄瓙涓婚銆傞€氬父鐢ㄤ簬鍖哄垎鍚屼竴涓富棰樹笅鐨勪笉鍚屼綔鐢ㄦ垨鑰呰涓嶅悓涓氬姟鐨勬秷鎭€傚悓鏃朵篃鏄伩鍏嶄富棰樺畾涔夎繃澶氬紩璧锋€ц兘闂锛岄€氬父鎯呭喌涓嬩竴涓敓浜ц€呯粍鍙悜涓€涓富棰樺彂閫佹秷鎭紝鍏朵腑涓嶅悓涓氬姟鐨勬秷鎭€氳繃鏍囩鎴栬€呰瀛愪富棰樻潵鍖哄垎銆?/li>
- 娑堟伅浠g悊锛圔roker锛夛細娑堟伅浠g悊鏄疪ockerMQ涓緢閲嶈鐨勮鑹层€傚畠鎺ユ敹鐢熶骇鑰呭彂閫佺殑娑堟伅锛岃繘琛屾秷鎭瓨鍌紝涓烘秷璐硅€呮媺鍙栨秷鎭湇鍔°€傚畠杩樺瓨鍌ㄦ秷鎭秷鑰楃浉鍏崇殑鍏冩暟鎹紝鍖呮嫭娑堣垂缇や綋锛屾秷璐硅繘搴﹀亸绉诲拰涓婚/闃熷垪淇℃伅銆?/li>
- 鍛藉悕鏈嶅姟锛圢ame Server锛夛細鍛藉悕鏈嶅姟浣滀负璺敱淇℃伅鎻愪緵绋嬪簭銆傜敓浜ц€?娑堣垂鑰呰繘琛屼富棰樻煡鎵俱€佹秷鎭唬鐞嗘煡鎵俱€佽鍙?鍐欏叆娑堟伅閮介渶瑕侀€氳繃鍛藉悕鏈嶅姟鑾峰彇璺敱淇℃伅銆?/li>
-
娑堟伅椤哄簭锛圡essage Order锛夛細褰撴垜浠娇鐢―efaultMQPushConsumer鏃讹紝鎴戜滑鍙互閫夋嫨浣跨敤鈥渙rderly鈥濊繕鏄€渃oncurrently鈥濄€?
-
orderly锛?/strong>娑堣垂娑堟伅鐨勬湁搴忓寲鎰忓懗鐫€娑堟伅琚敓浜ц€呮寜鐓ф瘡涓秷鎭槦鍒楀彂閫佺殑椤哄簭娑堣垂銆傚鏋滄偍姝e湪澶勭悊鍏ㄥ眬椤哄簭涓哄己鍒剁殑鍦烘櫙锛岃纭繚鎮ㄤ娇鐢ㄧ殑涓婚鍙湁涓€涓秷鎭槦鍒椼€傛敞鎰忥細濡傛灉鎸囧畾浜嗘秷璐归『搴忥紝鍒欐秷鎭秷璐圭殑鏈€澶у苟鍙戞€ф槸娑堣垂缁勮闃呯殑娑堟伅闃熷垪鏁般€?/li>
- concurrently锛?/strong>褰撳悓鏃舵秷璐规椂锛屾秷鎭秷璐圭殑鏈€澶у苟鍙戜粎闄愪簬涓烘瘡涓秷璐瑰鎴风鎸囧畾鐨勭嚎绋嬫睜銆傛敞鎰忥細姝ゆā寮忎笉鍐嶄繚璇佹秷鎭『搴忋€?/li>
-
orderly锛?/strong>娑堣垂娑堟伅鐨勬湁搴忓寲鎰忓懗鐫€娑堟伅琚敓浜ц€呮寜鐓ф瘡涓秷鎭槦鍒楀彂閫佺殑椤哄簭娑堣垂銆傚鏋滄偍姝e湪澶勭悊鍏ㄥ眬椤哄簭涓哄己鍒剁殑鍦烘櫙锛岃纭繚鎮ㄤ娇鐢ㄧ殑涓婚鍙湁涓€涓秷鎭槦鍒椼€傛敞鎰忥細濡傛灉鎸囧畾浜嗘秷璐归『搴忥紝鍒欐秷鎭秷璐圭殑鏈€澶у苟鍙戞€ф槸娑堣垂缁勮闃呯殑娑堟伅闃熷垪鏁般€?/li>
瀹夎涓庤皟璇?/h1>
瀹樻柟瑕佹眰鐨勭幆澧冿細
- 64bit OS, Linux/Unix/Mac is recommended;
- 64bit JDK 1.7+;
- Maven 3.2.x
- Git
鎴戠殑鐜锛氾紙鎴戝枩娆娇鐢ㄨ緝鏂扮殑鐗堟湰
锛?/h2>
- CentOS Linux release 7.3.1611;
- 64bit JDK 1.8.0_91;
- apache-maven-3.5.0;
- Git聽1.8.3.1
瀹夎jdk
楹荤儲鍚勪綅鐪嬪畼鑷鎼滅储锛岃祫鏂欏鐨勫悡浜恒€傘€傘€?img alt="寰瑧" style="border-style: none; max-width: 100%;" src="http://static.blog.csdn.net/xheditor/xheditor_emot/default/smile.gif">
瀹夎maven
鍏堝幓瀹樼綉涓嬭浇maven

鐒跺悗涓婁紶鍒板畨瑁呯洰褰曪紝瑙e帇锛?/blockquote>
- sudo聽tar聽zxvf聽apache-maven-3.5.0-bin.tar.gz聽聽
瑙e帇瀹屾垚璁剧疆鐜鍙橀噺锛?/blockquote>
- sudo聽vi聽/etc/profile聽聽

鐒跺悗浣跨幆澧冨彉閲忕敓鏁堬細
- source聽/etc/profile聽聽
鏈€鍚庨獙璇佹槸鍚﹀畨瑁呮垚鍔燂細
- mvn聽-v聽聽
瀹夎Git锛坰o easy
锛?/h3>
鍏堟鏌ョ湅鐪嬫槸鍚﹀凡缁忓畨瑁呰繃浜嗭細
- git聽--version聽聽
濡傛灉娌℃湁灏卞紑濮嬪畨瑁咃細
- sudo聽yum聽install聽git聽聽
瀹夎瀹屾瘯鍐嶇湅鐪嬶細
- git聽--version聽聽
涓嬮潰杩涜RocketMq瀹夎
缂栬瘧锛?/div>
- >聽git聽clone聽https://github.com/apache/incubator-rocketmq.git聽聽
- >聽cd聽incubator-rocketmq聽聽
- >聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
- >聽cd聽target/apache-rocketmq-all聽聽
鍦ㄦ墽琛宮vn缂栬瘧鐨勬椂鍊欙紝浣犲彲鑳戒細閬囧埌濡備笅鐨勯棶棰橈細
杩欐槸鐢变簬娌℃湁鏉冮檺鍒涘缓鐩綍閫犳垚鐨勩€傛墍浠ワ紝瑕佷箞浣犲垏鎹㈠埌root鐢ㄦ埛锛岃涔堜娇鐢╯udo锛?/div>
- sudo聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
鎻愮ず锛歴udo: mvn: command not found銆傚ソ鍚э紝涔熸槸閱変簡銆傛垜浠繕闇€瑕佸湪浣犲綋鍓嶇敤鎴风殑Home鐩綍涓嬬殑涓€涓殣钘忔枃浠讹紙.bashrc锛変腑娣诲姞鐐逛笢瑗匡細
- >聽cd聽~聽聽
- >聽sudo聽vi聽.bashrc聽聽
娣诲姞瀹屾垚鍚庯紝鎵ц锛歴ource聽.bashrc 聽浣夸慨鏀圭敓鏁堛€傜劧鍚庡啀閲嶆柊鎵ц鐪嬬湅锛?/span>
- sudo聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
鏃堕棿绋嶅井鏈夌偣闀匡紝鎴戠殑鐜鐢ㄤ簡16鍒嗛挓锛岃鐪嬪畼鑰愬績绛夊緟锛屽畬鎴愬悗濡備笅鍥撅細
鍚姩RocketMQ
淇敼榛樿閰嶇疆
鐢变簬RocketMQ榛樿閰嶇疆瑕佹眰寰堥珮锛屾瘮濡傚唴瀛樿嚦灏戝氨瑕?涓狦锛屽紑鍙戣皟璇曠幆澧冩牴鏈悆涓嶆秷锛屾墍浠ユ垜浠紑濮嬪惎鍔ㄥ墠闇€瑕佸厛淇敼杩欎簺鍙傛暟銆傚惁鍒欑殑璇濓紝鎴戜滑寰堟湁浼氶亣鍒板唴瀛樺垎閰嶆垨鑰呬笉澶熺殑闂銆?/div>
淇敼target/apache-rocketmq-all/bin/runserver.sh
- JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽-XX:PermSize=128m聽-XX:MaxPermSize=320m"聽聽
淇敼target/apache-rocketmq-all/bin/runbroker.sh
- JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽聽
淇敼target/apache-rocketmq-all/bin/tools.sh
- JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽-XX:PermSize=128m聽-XX:MaxPermSize=128m"聽聽
鍚姩NameServer
杩涘叆target/apache-rocketmq-all鐩綍涓?/div>
- >聽nohup聽sh聽bin/mqnamesrv聽&聽聽
- >聽tail聽-f聽~/logs/rocketmqlogs/namesrv.log聽聽
- The聽Name聽Server聽boot聽success...聽聽
鍚姩Broker
- >聽nohup聽sh聽bin/mqbroker聽-n聽localhost:9876聽&聽聽
- >聽tail聽-f聽~/logs/rocketmqlogs/broker.log聽聽聽
- The聽broker[%s,聽172.17.0.1:10911]聽boot聽success...聽聽
寮€鏀剧鍙?/h3>
- sudo聽vi聽/etc/sysconfig/iptables聽聽
鐒跺悗閲嶅惎鐢熸晥锛?/div>
- sudo聽systemctl聽restart聽iptables聽聽
娣诲姞ROCKETMQ_HOME鐜鍙橀噺
- sudo聽vi聽/etc/profile聽聽
- source聽/etc/profile聽聽
java瀹㈡埛绔?/h1>
pom.xml
- <rocketmq.version>4.0.0-incubating</rocketmq.version>聽聽
- 聽聽
- <dependency>聽聽
- 聽聽聽聽<groupId>org.apache.rocketmq</groupId>聽聽
- 聽聽聽聽<artifactId>rocketmq-client</artifactId>聽聽
- 聽聽聽聽<version>${rocketmq.version}</version>聽聽
- </dependency>聽聽
- <dependency>聽聽
- 聽聽聽聽<groupId>org.apache.rocketmq</groupId>聽聽
- 聽聽聽聽<artifactId>rocketmq-common</artifactId>聽聽
- 聽聽聽聽<version>${rocketmq.version}</version>聽聽
- </dependency>聽聽
鐢熶骇鑰?/h2>
- import聽org.apache.rocketmq.client.exception.MQClientException;聽聽
- import聽org.apache.rocketmq.client.producer.DefaultMQProducer;聽聽
- import聽org.apache.rocketmq.client.producer.SendResult;聽聽
- import聽org.apache.rocketmq.common.message.Message;聽聽
- 聽聽
- import聽java.util.concurrent.TimeUnit;聽聽
- 聽聽
- public聽class聽Producer聽{聽聽
- 聽聽聽聽public聽static聽void聽main(String[]聽args)聽throws聽MQClientException,聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽InterruptedException聽{聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽涓€涓簲鐢ㄥ垱寤轰竴涓狿roducer锛岀敱搴旂敤鏉ョ淮鎶ゆ瀵硅薄锛屽彲浠ヨ缃负鍏ㄥ眬瀵硅薄鎴栬€呭崟渚?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛歅roducerGroupName闇€瑕佺敱搴旂敤鏉ヤ繚璇佸敮涓€<br>聽
- 聽聽聽聽聽聽聽聽聽*聽ProducerGroup杩欎釜姒傚康鍙戦€佹櫘閫氱殑娑堟伅鏃讹紝浣滅敤涓嶅ぇ锛屼絾鏄彂閫佸垎甯冨紡浜嬪姟娑堟伅鏃讹紝姣旇緝鍏抽敭锛?/span>聽
- 聽聽聽聽聽聽聽聽聽*聽鍥犱负鏈嶅姟鍣ㄤ細鍥炴煡杩欎釜Group涓嬬殑浠绘剰涓€涓狿roducer聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽DefaultMQProducer聽producer聽=聽new聽DefaultMQProducer("ProducerGroupName");聽聽
- 聽聽聽聽聽聽聽聽producer.setNamesrvAddr("192.168.56.101:9876");聽聽
- 聽聽聽聽聽聽聽聽producer.setInstanceName("Producer");聽聽
- 聽聽聽聽聽聽聽聽producer.setVipChannelEnabled(false);聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽Producer瀵硅薄鍦ㄤ娇鐢ㄤ箣鍓嶅繀椤昏璋冪敤start鍒濆鍖栵紝鍒濆鍖栦竴娆″嵆鍙?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氬垏璁颁笉鍙互鍦ㄦ瘡娆″彂閫佹秷鎭椂锛岄兘璋冪敤start鏂规硶聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽producer.start();聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽涓嬮潰杩欐浠g爜琛ㄦ槑涓€涓狿roducer瀵硅薄鍙互鍙戦€佸涓猼opic锛屽涓猼ag鐨勬秷鎭€?/span>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛歴end鏂规硶鏄悓姝ヨ皟鐢紝鍙涓嶆姏寮傚父灏辨爣璇嗘垚鍔熴€備絾鏄彂閫佹垚鍔熶篃鍙細鏈夊绉嶇姸鎬侊紝<br>聽
- 聽聽聽聽聽聽聽聽聽*聽渚嬪娑堟伅鍐欏叆Master鎴愬姛锛屼絾鏄疭lave涓嶆垚鍔燂紝杩欑鎯呭喌娑堟伅灞炰簬鎴愬姛锛屼絾鏄浜庝釜鍒簲鐢ㄥ鏋滃娑堟伅鍙潬鎬ц姹傛瀬楂橈紝<br>聽
- 聽聽聽聽聽聽聽聽聽*聽闇€瑕佸杩欑鎯呭喌鍋氬鐞嗐€傚彟澶栵紝娑堟伅鍙兘浼氬瓨鍦ㄥ彂閫佸け璐ョ殑鎯呭喌锛屽け璐ラ噸璇曠敱搴旂敤鏉ュ鐞嗐€?/span>聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽for聽(int聽i聽=聽0;聽i聽<聽1;聽i++)聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽try聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest1",//聽topic聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagA",//聽tag聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID001",//聽key聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest2",//聽topic聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagB",//聽tag聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID0034",//聽key聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest3",//聽topic聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagC",//聽tag聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID061",//聽key聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽}聽catch聽(Exception聽e)聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽e.printStackTrace();聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽TimeUnit.MILLISECONDS.sleep(1000);聽聽
- 聽聽聽聽聽聽聽聽}聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽搴旂敤閫€鍑烘椂锛岃璋冪敤shutdown鏉ユ竻鐞嗚祫婧愶紝鍏抽棴缃戠粶杩炴帴锛屼粠MetaQ鏈嶅姟鍣ㄤ笂娉ㄩ攢鑷繁聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氭垜浠缓璁簲鐢ㄥ湪JBOSS銆乀omcat绛夊鍣ㄧ殑閫€鍑洪挬瀛愰噷璋冪敤shutdown鏂规硶聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽producer.shutdown();聽聽
- 聽聽聽聽}聽聽
- }聽聽
娑堣垂鑰?/h2>
聽
- import聽org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;聽聽
- import聽org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;聽聽
- import聽org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;聽聽
- import聽org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;聽聽
- import聽org.apache.rocketmq.client.exception.MQClientException;聽聽
- import聽org.apache.rocketmq.common.message.MessageExt;聽聽
- 聽聽
- import聽java.util.List;聽聽
- 聽聽
- public聽class聽PushConsumer聽{聽聽
- 聽聽
- 聽聽聽聽/**聽
- 聽聽聽聽聽*聽褰撳墠渚嬪瓙鏄疨ushConsumer鐢ㄦ硶锛屼娇鐢ㄦ柟寮忕粰鐢ㄦ埛鎰熻鏄秷鎭粠RocketMQ鏈嶅姟鍣ㄦ帹鍒颁簡搴旂敤瀹㈡埛绔€?lt;br>聽
- 聽聽聽聽聽*聽浣嗘槸瀹為檯PushConsumer鍐呴儴鏄娇鐢ㄩ暱杞Pull鏂瑰紡浠嶮etaQ鏈嶅姟鍣ㄦ媺娑堟伅锛岀劧鍚庡啀鍥炶皟鐢ㄦ埛Listener鏂规硶<br>聽
- 聽聽聽聽聽*/聽聽
- 聽聽聽聽public聽static聽void聽main(String[]聽args)聽throws聽InterruptedException,聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽MQClientException聽{聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽涓€涓簲鐢ㄥ垱寤轰竴涓狢onsumer锛岀敱搴旂敤鏉ョ淮鎶ゆ瀵硅薄锛屽彲浠ヨ缃负鍏ㄥ眬瀵硅薄鎴栬€呭崟渚?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛欳onsumerGroupName闇€瑕佺敱搴旂敤鏉ヤ繚璇佸敮涓€聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽DefaultMQPushConsumer聽consumer聽=聽new聽DefaultMQPushConsumer(聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"ConsumerGroupName");聽聽
- 聽聽聽聽聽聽聽聽consumer.setNamesrvAddr("192.168.56.101:9876");聽聽
- 聽聽聽聽聽聽聽聽consumer.setInstanceName("Consumber");聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽璁㈤槄鎸囧畾topic涓媡ags鍒嗗埆绛変簬TagA鎴朤agC鎴朤agD聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽consumer.subscribe("TopicTest1",聽"TagA聽||聽TagC聽||聽TagD");聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽璁㈤槄鎸囧畾topic涓嬫墍鏈夋秷鎭?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氫竴涓猚onsumer瀵硅薄鍙互璁㈤槄澶氫釜topic聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽consumer.subscribe("TopicTest2",聽"*");聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽consumer.registerMessageListener(new聽MessageListenerConcurrently()聽{聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽*聽榛樿msgs閲屽彧鏈変竴鏉℃秷鎭紝鍙互閫氳繃璁剧疆consumeMessageBatchMaxSize鍙傛暟鏉ユ壒閲忔帴鏀舵秷鎭?/span>聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽@Override聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽public聽ConsumeConcurrentlyStatus聽consumeMessage(聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽List<MessageExt>聽msgs,聽ConsumeConcurrentlyContext聽context)聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(Thread.currentThread().getName()聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽+聽"聽Receive聽New聽Messages:聽"聽+聽msgs.size());聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽MessageExt聽msg聽=聽msgs.get(0);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽if聽(msg.getTopic().equals("TopicTest1"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTopicTest1鐨勬秷璐归€昏緫聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽if聽(msg.getTags()聽!=聽null聽&&聽msg.getTags().equals("TagA"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagA鐨勬秷璐?/span>聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(new聽String(msg.getBody()));聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽else聽if聽(msg.getTags()聽!=聽null聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽&&聽msg.getTags().equals("TagC"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagC鐨勬秷璐?/span>聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽else聽if聽(msg.getTags()聽!=聽null聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽&&聽msg.getTags().equals("TagD"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagD鐨勬秷璐?/span>聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽else聽if聽(msg.getTopic().equals("TopicTest2"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(new聽String(msg.getBody()));聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽return聽ConsumeConcurrentlyStatus.CONSUME_SUCCESS;聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽聽聽聽聽聽聽});聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽Consumer瀵硅薄鍦ㄤ娇鐢ㄤ箣鍓嶅繀椤昏璋冪敤start鍒濆鍖栵紝鍒濆鍖栦竴娆″嵆鍙?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽consumer.start();聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽System.out.println("Consumer聽Started.");聽聽
- 聽聽聽聽}聽聽
- }聽聽
-
聽
聽
http://blog.csdn.net/jayjjb/article/details/69948357
文章评论
相关解决方案
瀹樻柟瑕佹眰鐨勭幆澧冿細
- 64bit OS, Linux/Unix/Mac is recommended;
- 64bit JDK 1.7+;
- Maven 3.2.x
- Git
鎴戠殑鐜锛氾紙鎴戝枩娆娇鐢ㄨ緝鏂扮殑鐗堟湰
锛?/h2>
- CentOS Linux release 7.3.1611;
- 64bit JDK 1.8.0_91;
- apache-maven-3.5.0;
- Git聽1.8.3.1
瀹夎jdk
楹荤儲鍚勪綅鐪嬪畼鑷鎼滅储锛岃祫鏂欏鐨勫悡浜恒€傘€傘€?img alt="寰瑧" style="border-style: none; max-width: 100%;" src="http://static.blog.csdn.net/xheditor/xheditor_emot/default/smile.gif">
瀹夎maven
鍏堝幓瀹樼綉涓嬭浇maven
鐒跺悗涓婁紶鍒板畨瑁呯洰褰曪紝瑙e帇锛?/blockquote>
- sudo聽tar聽zxvf聽apache-maven-3.5.0-bin.tar.gz聽聽
瑙e帇瀹屾垚璁剧疆鐜鍙橀噺锛?/blockquote>
- sudo聽vi聽/etc/profile聽聽
鐒跺悗浣跨幆澧冨彉閲忕敓鏁堬細
- source聽/etc/profile聽聽
鏈€鍚庨獙璇佹槸鍚﹀畨瑁呮垚鍔燂細
- mvn聽-v聽聽
瀹夎Git锛坰o easy
锛?/h3>
鍏堟鏌ョ湅鐪嬫槸鍚﹀凡缁忓畨瑁呰繃浜嗭細
- git聽--version聽聽
濡傛灉娌℃湁灏卞紑濮嬪畨瑁咃細
- sudo聽yum聽install聽git聽聽
瀹夎瀹屾瘯鍐嶇湅鐪嬶細
- git聽--version聽聽
涓嬮潰杩涜RocketMq瀹夎
缂栬瘧锛?/div>
- >聽git聽clone聽https://github.com/apache/incubator-rocketmq.git聽聽
- >聽cd聽incubator-rocketmq聽聽
- >聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
- >聽cd聽target/apache-rocketmq-all聽聽
鍦ㄦ墽琛宮vn缂栬瘧鐨勬椂鍊欙紝浣犲彲鑳戒細閬囧埌濡備笅鐨勯棶棰橈細杩欐槸鐢变簬娌℃湁鏉冮檺鍒涘缓鐩綍閫犳垚鐨勩€傛墍浠ワ紝瑕佷箞浣犲垏鎹㈠埌root鐢ㄦ埛锛岃涔堜娇鐢╯udo锛?/div>
- sudo聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
鎻愮ず锛歴udo: mvn: command not found銆傚ソ鍚э紝涔熸槸閱変簡銆傛垜浠繕闇€瑕佸湪浣犲綋鍓嶇敤鎴风殑Home鐩綍涓嬬殑涓€涓殣钘忔枃浠讹紙.bashrc锛変腑娣诲姞鐐逛笢瑗匡細
- >聽cd聽~聽聽
- >聽sudo聽vi聽.bashrc聽聽
娣诲姞瀹屾垚鍚庯紝鎵ц锛歴ource聽.bashrc 聽浣夸慨鏀圭敓鏁堛€傜劧鍚庡啀閲嶆柊鎵ц鐪嬬湅锛?/span>
- sudo聽mvn聽clean聽package聽install聽-Prelease-all聽assembly:assembly聽-U聽聽
鏃堕棿绋嶅井鏈夌偣闀匡紝鎴戠殑鐜鐢ㄤ簡16鍒嗛挓锛岃鐪嬪畼鑰愬績绛夊緟锛屽畬鎴愬悗濡備笅鍥撅細鍚姩RocketMQ
淇敼榛樿閰嶇疆
鐢变簬RocketMQ榛樿閰嶇疆瑕佹眰寰堥珮锛屾瘮濡傚唴瀛樿嚦灏戝氨瑕?涓狦锛屽紑鍙戣皟璇曠幆澧冩牴鏈悆涓嶆秷锛屾墍浠ユ垜浠紑濮嬪惎鍔ㄥ墠闇€瑕佸厛淇敼杩欎簺鍙傛暟銆傚惁鍒欑殑璇濓紝鎴戜滑寰堟湁浼氶亣鍒板唴瀛樺垎閰嶆垨鑰呬笉澶熺殑闂銆?/div>淇敼target/apache-rocketmq-all/bin/runserver.sh
- JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽-XX:PermSize=128m聽-XX:MaxPermSize=320m"聽聽
淇敼target/apache-rocketmq-all/bin/runbroker.sh
- JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽聽
淇敼target/apache-rocketmq-all/bin/tools.sh
- JAVA_OPT="${JAVA_OPT}聽-server聽-Xms256m聽-Xmx256m聽-Xmn128m聽-XX:PermSize=128m聽-XX:MaxPermSize=128m"聽聽
鍚姩NameServer
杩涘叆target/apache-rocketmq-all鐩綍涓?/div>
- >聽nohup聽sh聽bin/mqnamesrv聽&聽聽
- >聽tail聽-f聽~/logs/rocketmqlogs/namesrv.log聽聽
- The聽Name聽Server聽boot聽success...聽聽
鍚姩Broker
- >聽nohup聽sh聽bin/mqbroker聽-n聽localhost:9876聽&聽聽
- >聽tail聽-f聽~/logs/rocketmqlogs/broker.log聽聽聽
- The聽broker[%s,聽172.17.0.1:10911]聽boot聽success...聽聽
寮€鏀剧鍙?/h3>
- sudo聽vi聽/etc/sysconfig/iptables聽聽
鐒跺悗閲嶅惎鐢熸晥锛?/div>
- sudo聽systemctl聽restart聽iptables聽聽
娣诲姞ROCKETMQ_HOME鐜鍙橀噺
- sudo聽vi聽/etc/profile聽聽
- source聽/etc/profile聽聽
java瀹㈡埛绔?/h1>
pom.xml
- <rocketmq.version>4.0.0-incubating</rocketmq.version>聽聽
- 聽聽
- <dependency>聽聽
- 聽聽聽聽<groupId>org.apache.rocketmq</groupId>聽聽
- 聽聽聽聽<artifactId>rocketmq-client</artifactId>聽聽
- 聽聽聽聽<version>${rocketmq.version}</version>聽聽
- </dependency>聽聽
- <dependency>聽聽
- 聽聽聽聽<groupId>org.apache.rocketmq</groupId>聽聽
- 聽聽聽聽<artifactId>rocketmq-common</artifactId>聽聽
- 聽聽聽聽<version>${rocketmq.version}</version>聽聽
- </dependency>聽聽
鐢熶骇鑰?/h2>
- import聽org.apache.rocketmq.client.exception.MQClientException;聽聽
- import聽org.apache.rocketmq.client.producer.DefaultMQProducer;聽聽
- import聽org.apache.rocketmq.client.producer.SendResult;聽聽
- import聽org.apache.rocketmq.common.message.Message;聽聽
- 聽聽
- import聽java.util.concurrent.TimeUnit;聽聽
- 聽聽
- public聽class聽Producer聽{聽聽
- 聽聽聽聽public聽static聽void聽main(String[]聽args)聽throws聽MQClientException,聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽InterruptedException聽{聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽涓€涓簲鐢ㄥ垱寤轰竴涓狿roducer锛岀敱搴旂敤鏉ョ淮鎶ゆ瀵硅薄锛屽彲浠ヨ缃负鍏ㄥ眬瀵硅薄鎴栬€呭崟渚?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛歅roducerGroupName闇€瑕佺敱搴旂敤鏉ヤ繚璇佸敮涓€<br>聽
- 聽聽聽聽聽聽聽聽聽*聽ProducerGroup杩欎釜姒傚康鍙戦€佹櫘閫氱殑娑堟伅鏃讹紝浣滅敤涓嶅ぇ锛屼絾鏄彂閫佸垎甯冨紡浜嬪姟娑堟伅鏃讹紝姣旇緝鍏抽敭锛?/span>聽
- 聽聽聽聽聽聽聽聽聽*聽鍥犱负鏈嶅姟鍣ㄤ細鍥炴煡杩欎釜Group涓嬬殑浠绘剰涓€涓狿roducer聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽DefaultMQProducer聽producer聽=聽new聽DefaultMQProducer("ProducerGroupName");聽聽
- 聽聽聽聽聽聽聽聽producer.setNamesrvAddr("192.168.56.101:9876");聽聽
- 聽聽聽聽聽聽聽聽producer.setInstanceName("Producer");聽聽
- 聽聽聽聽聽聽聽聽producer.setVipChannelEnabled(false);聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽Producer瀵硅薄鍦ㄤ娇鐢ㄤ箣鍓嶅繀椤昏璋冪敤start鍒濆鍖栵紝鍒濆鍖栦竴娆″嵆鍙?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氬垏璁颁笉鍙互鍦ㄦ瘡娆″彂閫佹秷鎭椂锛岄兘璋冪敤start鏂规硶聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽producer.start();聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽涓嬮潰杩欐浠g爜琛ㄦ槑涓€涓狿roducer瀵硅薄鍙互鍙戦€佸涓猼opic锛屽涓猼ag鐨勬秷鎭€?/span>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛歴end鏂规硶鏄悓姝ヨ皟鐢紝鍙涓嶆姏寮傚父灏辨爣璇嗘垚鍔熴€備絾鏄彂閫佹垚鍔熶篃鍙細鏈夊绉嶇姸鎬侊紝<br>聽
- 聽聽聽聽聽聽聽聽聽*聽渚嬪娑堟伅鍐欏叆Master鎴愬姛锛屼絾鏄疭lave涓嶆垚鍔燂紝杩欑鎯呭喌娑堟伅灞炰簬鎴愬姛锛屼絾鏄浜庝釜鍒簲鐢ㄥ鏋滃娑堟伅鍙潬鎬ц姹傛瀬楂橈紝<br>聽
- 聽聽聽聽聽聽聽聽聽*聽闇€瑕佸杩欑鎯呭喌鍋氬鐞嗐€傚彟澶栵紝娑堟伅鍙兘浼氬瓨鍦ㄥ彂閫佸け璐ョ殑鎯呭喌锛屽け璐ラ噸璇曠敱搴旂敤鏉ュ鐞嗐€?/span>聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽for聽(int聽i聽=聽0;聽i聽<聽1;聽i++)聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽try聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest1",//聽topic聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagA",//聽tag聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID001",//聽key聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest2",//聽topic聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagB",//聽tag聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID0034",//聽key聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Message聽msg聽=聽new聽Message("TopicTest3",//聽topic聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"TagC",//聽tag聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"OrderID061",//聽key聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽("Hello聽MetaQ").getBytes());//聽body聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽SendResult聽sendResult聽=聽producer.send(msg);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(sendResult);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽}聽catch聽(Exception聽e)聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽e.printStackTrace();聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽TimeUnit.MILLISECONDS.sleep(1000);聽聽
- 聽聽聽聽聽聽聽聽}聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽搴旂敤閫€鍑烘椂锛岃璋冪敤shutdown鏉ユ竻鐞嗚祫婧愶紝鍏抽棴缃戠粶杩炴帴锛屼粠MetaQ鏈嶅姟鍣ㄤ笂娉ㄩ攢鑷繁聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氭垜浠缓璁簲鐢ㄥ湪JBOSS銆乀omcat绛夊鍣ㄧ殑閫€鍑洪挬瀛愰噷璋冪敤shutdown鏂规硶聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽producer.shutdown();聽聽
- 聽聽聽聽}聽聽
- }聽聽
娑堣垂鑰?/h2>
聽
- import聽org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;聽聽
- import聽org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;聽聽
- import聽org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;聽聽
- import聽org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;聽聽
- import聽org.apache.rocketmq.client.exception.MQClientException;聽聽
- import聽org.apache.rocketmq.common.message.MessageExt;聽聽
- 聽聽
- import聽java.util.List;聽聽
- 聽聽
- public聽class聽PushConsumer聽{聽聽
- 聽聽
- 聽聽聽聽/**聽
- 聽聽聽聽聽*聽褰撳墠渚嬪瓙鏄疨ushConsumer鐢ㄦ硶锛屼娇鐢ㄦ柟寮忕粰鐢ㄦ埛鎰熻鏄秷鎭粠RocketMQ鏈嶅姟鍣ㄦ帹鍒颁簡搴旂敤瀹㈡埛绔€?lt;br>聽
- 聽聽聽聽聽*聽浣嗘槸瀹為檯PushConsumer鍐呴儴鏄娇鐢ㄩ暱杞Pull鏂瑰紡浠嶮etaQ鏈嶅姟鍣ㄦ媺娑堟伅锛岀劧鍚庡啀鍥炶皟鐢ㄦ埛Listener鏂规硶<br>聽
- 聽聽聽聽聽*/聽聽
- 聽聽聽聽public聽static聽void聽main(String[]聽args)聽throws聽InterruptedException,聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽MQClientException聽{聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽涓€涓簲鐢ㄥ垱寤轰竴涓狢onsumer锛岀敱搴旂敤鏉ョ淮鎶ゆ瀵硅薄锛屽彲浠ヨ缃负鍏ㄥ眬瀵硅薄鎴栬€呭崟渚?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛欳onsumerGroupName闇€瑕佺敱搴旂敤鏉ヤ繚璇佸敮涓€聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽DefaultMQPushConsumer聽consumer聽=聽new聽DefaultMQPushConsumer(聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽"ConsumerGroupName");聽聽
- 聽聽聽聽聽聽聽聽consumer.setNamesrvAddr("192.168.56.101:9876");聽聽
- 聽聽聽聽聽聽聽聽consumer.setInstanceName("Consumber");聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽璁㈤槄鎸囧畾topic涓媡ags鍒嗗埆绛変簬TagA鎴朤agC鎴朤agD聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽consumer.subscribe("TopicTest1",聽"TagA聽||聽TagC聽||聽TagD");聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽璁㈤槄鎸囧畾topic涓嬫墍鏈夋秷鎭?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*聽娉ㄦ剰锛氫竴涓猚onsumer瀵硅薄鍙互璁㈤槄澶氫釜topic聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽consumer.subscribe("TopicTest2",聽"*");聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽consumer.registerMessageListener(new聽MessageListenerConcurrently()聽{聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽*聽榛樿msgs閲屽彧鏈変竴鏉℃秷鎭紝鍙互閫氳繃璁剧疆consumeMessageBatchMaxSize鍙傛暟鏉ユ壒閲忔帴鏀舵秷鎭?/span>聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽@Override聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽public聽ConsumeConcurrentlyStatus聽consumeMessage(聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽List<MessageExt>聽msgs,聽ConsumeConcurrentlyContext聽context)聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(Thread.currentThread().getName()聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽+聽"聽Receive聽New聽Messages:聽"聽+聽msgs.size());聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽MessageExt聽msg聽=聽msgs.get(0);聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽if聽(msg.getTopic().equals("TopicTest1"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTopicTest1鐨勬秷璐归€昏緫聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽if聽(msg.getTags()聽!=聽null聽&&聽msg.getTags().equals("TagA"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagA鐨勬秷璐?/span>聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(new聽String(msg.getBody()));聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽else聽if聽(msg.getTags()聽!=聽null聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽&&聽msg.getTags().equals("TagC"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagC鐨勬秷璐?/span>聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽else聽if聽(msg.getTags()聽!=聽null聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽&&聽msg.getTags().equals("TagD"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽//聽鎵цTagD鐨勬秷璐?/span>聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽else聽if聽(msg.getTopic().equals("TopicTest2"))聽{聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽System.out.println(new聽String(msg.getBody()));聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽return聽ConsumeConcurrentlyStatus.CONSUME_SUCCESS;聽聽
- 聽聽聽聽聽聽聽聽聽聽聽聽}聽聽
- 聽聽聽聽聽聽聽聽});聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽/**聽
- 聽聽聽聽聽聽聽聽聽*聽Consumer瀵硅薄鍦ㄤ娇鐢ㄤ箣鍓嶅繀椤昏璋冪敤start鍒濆鍖栵紝鍒濆鍖栦竴娆″嵆鍙?lt;br>聽
- 聽聽聽聽聽聽聽聽聽*/聽聽
- 聽聽聽聽聽聽聽聽consumer.start();聽聽
- 聽聽
- 聽聽聽聽聽聽聽聽System.out.println("Consumer聽Started.");聽聽
- 聽聽聽聽}聽聽
- }聽聽
聽
聽
http://blog.csdn.net/jayjjb/article/details/69948357
文章评论
相关解决方案