Java HTTP协议收发MQ 消息代码实例详解
发布时间 - 2026-01-11 00:45:02 点击率:次1. 准备环境

在工程 POM 文件添加 HTTP Java 客户端的依赖。
<dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-client</artifactId> <version>9.3.4.RC1</version> </dependency> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.1.11</version> </dependency>
2. 运行代码配置(user.properties)
您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源 。
#您在控制台创建的Topic Topic=xxx #公测url URL=http://publictest-rest.ons.aliyun.com #阿里云身份验证码 Ak=xxx #阿里云身份验证密钥 Sk=xxx #MQ控制台创建的Producer ID ProducerID=xxx #MQ控制台创建的Consumer ID ConsumerID=xxx
说明:URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。
3. HTTP 发送消息示例代码
您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。
package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Properties;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.StringContentProvider;
import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
public class HttpProducer {
public static String SIGNATURE="Signature";
public static String NUM="num";
public static String CONSUMERID="ConsumerID";
public static String PRODUCERID="ProducerID";
public static String TIMEOUT="timeout";
public static String TOPIC="Topic";
public static String AK="AccessKey";
public static String BODY="body";
public static String MSGHANDLE="msgHandle";
public static String TIME="time";
public static void main(String[] args) throws Exception {
HttpClient httpClient=new HttpClient();
httpClient.setMaxConnectionsPerDestination(1);
httpClient.start();
Properties properties=new Properties();
properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));
String topic=properties.getProperty("Topic"); //请在user.properties配置您的Topic
String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
String pid=properties.getProperty("ProducerID");//请在user.properties配置您的Producer ID
String date=String.valueOf(new Date().getTime());
String sign=null;
String body="hello ons http";
String NEWLINE="\n";
String signString;
for (int i = 0; i < 10; i++) {
date=String.valueOf(new Date().getTime());
Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");
ContentProvider content=new StringContentProvider(body);
req.content(content);
signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;
System.out.println(signString);
sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
req.header(SIGNATURE, sign);
req.header(AK, ak);
req.header(PRODUCERID, pid);
ContentResponse response;
response=req.send();
System.out.println("send msg:"+response.getStatus()+response.getContentAsString());
}
}
}
4. HTTP接收消息示例代码
请按以下说明设置相应参数并测试 HTTP 消息接收功能。
package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservice.ons.mqtt.demo.MqttProducer;
import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
public class HttpConsumer {
public static String SIGNATURE="Signature";
public static String NUM="num";
public static String CONSUMERID="ConsumerID";
public static String PRODUCERID="ProducerID";
public static String TIMEOUT="timeout";
public static String TOPIC="Topic";
public static String AK="AccessKey";
public static String BODY="body";
public static String MSGHANDLE="msgHandle";
public static String TIME="time";
public static void main(String[] args) throws Exception {
HttpClient httpClient=new HttpClient();
httpClient.setMaxConnectionsPerDestination(1);
httpClient.start();
Properties properties=new Properties();
properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));
String topic=properties.getProperty("Topic"); //请在user.properties配置您的topic
String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
String cid=properties.getProperty("ConsumerID");//请在user.properties配置您的Consumer ID
String date=String.valueOf(new Date().getTime());
String sign=null;
String NEWLINE="\n";
String signString;
System.out.println(NEWLINE+NEWLINE);
while (true) {
try {
date=String.valueOf(new Date().getTime());
Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);
req.method(HttpMethod.GET);
ContentResponse response;
signString=topic+NEWLINE+cid+NEWLINE+date;
sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
req.header(SIGNATURE, sign);
req.header(AK, ak);
req.header(CONSUMERID, cid);
long start=System.currentTimeMillis();
response=req.send();
System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000
+" "+response.getStatus()+" "+response.getContentAsString());
List<SimpleMessage> list = null;
if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) {
list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class);
}
if (list==null||list.size()==0) {
Thread.sleep(100);
continue;
}
System.out.println("size is :"+list.size());
for (SimpleMessage simpleMessage : list) {
date=String.valueOf(new Date().getTime());
System.out.println("receive msg:"+simpleMessage.getBody()+" born time "+simpleMessage.getBornTime());
req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date);
req.method(HttpMethod.DELETE);
signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date;
sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
req.header(SIGNATURE, sign);
req.header(AK, ak);
req.header(CONSUMERID, cid);
response=req.send();
System.out.println("delete msg:"+response.toString());
}
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
5. HTTP示例程序工具类
(1)消息封装类: SimpleMessage.java
package com.aliyun.openservice.ons.http.demo;
public class SimpleMessage {
private String body;
private String msgId;
private String bornTime;
private String msgHandle;
private int reconsumeTimes;
private String tag;
public void setTag(String tag) {
this.tag = tag;
}
public String getTag() {
return tag;
}
public int getReconsumeTimes() {
return reconsumeTimes;
}
public void setReconsumeTimes(int reconsumeTimes) {
this.reconsumeTimes = reconsumeTimes;
}
public void setMsgHandle(String msgHandle) {
this.msgHandle = msgHandle;
}
public String getMsgHandle() {
return msgHandle;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public String getMsgId() {
return msgId;
}
public void setMsgId(String msgId) {
this.msgId = msgId;
}
public String getBornTime() {
return bornTime;
}
public void setBornTime(String bornTime) {
this.bornTime = bornTime;
}
}
(2)字符串签名类: MD5.java
package com.aliyun.openservice.ons.http.demo;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.LoggerFactory;
public class MD5 {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class);
private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>(16);
static {
for (int i = 0; i < digits.length; ++i) {
rDigits.put(digits[i], i);
}
}
private static MD5 me = new MD5();
private MessageDigest mHasher;
private final ReentrantLock opLock = new ReentrantLock();
private MD5() {
try {
this.mHasher = MessageDigest.getInstance("md5");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public static MD5 getInstance() {
return me;
}
public String getMD5String(String content) {
return this.bytes2string(this.hash(content));
}
public String getMD5String(byte[] content) {
return this.bytes2string(this.hash(content));
}
public byte[] getMD5Bytes(byte[] content) {
return this.hash(content);
}
public byte[] hash(String str) {
this.opLock.lock();
try {
byte[] bt = this.mHasher.digest(str.getBytes("utf-8"));
if (null == bt || bt.length != 16) {
throw new IllegalArgumentException("md5 need");
}
return bt;
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("unsupported utf-8 encoding", e);
} finally {
this.opLock.unlock();
}
}
public byte[] hash(byte[] data) {
this.opLock.lock();
try {
byte[] bt = this.mHasher.digest(data);
if (null == bt || bt.length != 16) {
throw new IllegalArgumentException("md5 need");
}
return bt;
} finally {
this.opLock.unlock();
}
}
public String bytes2string(byte[] bt) {
int l = bt.length;
char[] out = new char[l << 1];
for (int i = 0, j = 0; i < l; i++) {
out[j++] = digits[(0xF0 & bt[i]) >>> 4];
out[j++] = digits[0x0F & bt[i]];
}
if (log.isDebugEnabled()) {
log.debug("[hash]" + new String(out));
}
return new String(out);
}
public byte[] string2bytes(String str) {
if (null == str) {
throw new NullPointerException("Argument is not allowed empty");
}
if (str.length() != 32) {
throw new IllegalArgumentException("String length must equals 32");
}
byte[] data = new byte[16];
char[] chs = str.toCharArray();
for (int i = 0; i < 16; ++i) {
int h = rDigits.get(chs[i * 2]).intValue();
int l = rDigits.get(chs[i * 2 + 1]).intValue();
data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F);
}
return data;
}
}
希望本篇文章对您有所帮助
# Java
# HTTP协议收发MQ
# 消息代码实
# Java收
# HTTP收发MQ消息
# 基于JAVA中Jersey处理Http协议中的Multipart的详解
# javaweb中Http协议详解
# Java获取http和https协议返回的json数据
# Java与Http协议的详细介绍
# Java下载远程服务器文件到本地(基于http协议和ssh2协议)
# Java9新特性对HTTP2协议支持与非阻塞HTTP API
# Java中的 HTTP 协议原理详解
# 您的
# 请在
# 公测
# 放在
# 相关内容
# 没有任何
# 您可以
# 您在
# 您需要
# 请按
# 验证码
# 对您
# 请参考
# 配置文件
# 身份验证
# 客户端
# 发送消息
# api
# HttpClient
# PRODUCERID
相关栏目:
【
网站优化151355 】
【
网络推广146373 】
【
网络技术251813 】
【
AI营销90571 】
相关推荐:
Python文件异常处理策略_健壮性说明【指导】
如何快速建站并高效导出源代码?
惠州网站建设制作推广,惠州市华视达文化传媒有限公司怎么样?
html如何与html链接_实现多个HTML页面互相链接【互相】
制作旅游网站html,怎样注册旅游网站?
Windows Hello人脸识别突然无法使用
教学论文网站制作软件有哪些,写论文用什么软件
?
高性价比服务器租赁——企业级配置与24小时运维服务
专业商城网站制作公司有哪些,pi商城官网是哪个?
详解jQuery中的事件
如何快速搭建虚拟主机网站?新手必看指南
Laravel如何生成URL和重定向?(路由助手函数)
阿里云网站搭建费用解析:服务器价格与建站成本优化指南
今日头条微视频如何找选题 今日头条微视频找选题技巧【指南】
Laravel Asset编译怎么配置_Laravel Vite前端构建工具使用
JavaScript如何操作视频_媒体API怎么控制播放
Laravel怎么生成URL_Laravel路由命名与URL生成函数详解
javascript中对象的定义、使用以及对象和原型链操作小结
Laravel如何使用Gate和Policy进行权限控制_Laravel权限判定与策略规则配置
动图在线制作网站有哪些,滑动动图图集怎么做?
Laravel怎么返回JSON格式数据_Laravel API资源Response响应格式化【技巧】
手机网站制作与建设方案,手机网站如何建设?
Laravel如何实现用户角色和权限系统_Laravel角色权限管理机制
Android实现代码画虚线边框背景效果
Win11怎么恢复误删照片_Win11数据恢复工具使用【推荐】
Laravel如何使用withoutEvents方法临时禁用模型事件
laravel怎么为API路由添加签名中间件保护_laravel API路由签名中间件保护方法
html5怎么画眼睛_HT5用Canvas或SVG画眼球瞳孔加JS控制动态【绘制】
Laravel如何优雅地处理服务层_在Laravel中使用Service层和Repository层
深圳网站制作公司好吗,在深圳找工作哪个网站最好啊?
Laravel的契約(Contracts)是什么_深入理解Laravel Contracts与依赖倒置
如何在IIS7中新建站点?详细步骤解析
如何为不同团队 ID 动态生成多个独立按钮
如何撰写建站申请书?关键要点有哪些?
如何获取PHP WAP自助建站系统源码?
Windows10如何删除恢复分区_Win10 Diskpart命令强制删除分区
html5如何设置样式_HTML5样式设置方法与CSS应用技巧【教程】
Laravel如何实现本地化和多语言支持_Laravel多语言配置与翻译文件管理
Laravel如何使用Eloquent进行子查询
Laravel Blade模板引擎语法_Laravel Blade布局继承用法
Laravel如何将应用部署到生产服务器_Laravel生产环境部署流程
Laravel如何使用Livewire构建动态组件?(入门代码)
如何在景安服务器上快速搭建个人网站?
Swift开发中switch语句值绑定模式
免费视频制作网站,更新又快又好的免费电影网站?
Laravel项目如何进行性能优化_Laravel应用性能分析与优化技巧大全
Python3.6正式版新特性预览
如何用花生壳三步快速搭建专属网站?
Python自然语言搜索引擎项目教程_倒排索引查询优化案例
php增删改查怎么学_零基础入门php数据库操作必知基础【教程】

