IoT Amazon MQTT 客户端

什么是 AWS IoT?

AWS IoT 在联网设备(如传感器、执行器、嵌入式微控制器或智能设备)与 AWS 云之间提供安全的双向通信。这使您可以从多台设备收集遥测数据并存储和分析。您还可以创建应用程序,使用户能够通过手机或平板电脑控制这些设备。

 

消息代理

提供一种安全机制,使设备和 AWS IoT 应用程序能够相互发布和接收消息。您可以直接使用 MQTT 协议或通过 WebSocket 上的 MQTT 进行发布和订阅。

 

AWS IoT 消息代理是一个发布/订阅代理服务,能够向 AWS IoT 发送和接收消息。与 AWS IoT 通信时,客户端将消息发送到类似 Sensor/temp/room1 这样的主题地址。

 

消息代理随后将消息发送给所有已注册接收该主题消息的客户端。发送消息的行为称为发布,注册接收特定主题过滤器消息的行为称为订阅。

 

主题命名空间针对每个 AWS 账户和区域对是隔离的。例如,某个 AWS 账户的 Sensor/temp/room1 主题与另一个 AWS 账户的同名主题相互独立。区域也是如此。同一 AWS 账户在 us-east-1 的 Sensor/temp/room1 主题与 us-east-2 的同名主题相互独立。AWS IoT 不支持跨 AWS 账户和区域发送及接收消息。

 

消息代理维护所有客户端会话及每个会话的订阅列表。当消息发布到某个主题时,代理检查具有映射到该主题的订阅的会话,然后将发布消息转发给所有当前有已连接客户端的会话。

 

MQTT 客户端

TsgcIoTAmazon_MQTT_Client 是用于连接 AWS IoT 的组件。一个客户端只能连接到一个设备。客户端使用普通 MQTT 协议连接,并使用 X.509 客户端证书进行身份验证。

 

连接到 AWS IoT 时,客户端需要以下属性:

 

Amazon.ClientId:客户端标识符,可选。

Amazon.Endpoint:MQTT 客户端将连接的服务器名称。

Amazon.Port:默认使用端口 8883。如果端口为 443,则自动使用 ALPN 进行连接(需要自定义 Indy 版本)。

 

AWS IoT Core 支持使用 MQTT 和基于 WebSocket Secure(WSS)的 MQTT 协议的设备和客户端进行消息发布和订阅。下表列出了 AWS IoT 设备端点支持的协议以及它们使用的身份验证方法和端口。

 

协议 身份验证 端口 ALPN 协议名称
WebSocket 上的 MQTT Signature Version 4 443  
WebSocket 上的 MQTT 自定义身份验证 443  
MQTT X.509 客户端证书 443 x-amzn-mqtt-ca
MQTT X.509 客户端证书 8883  
MQTT 自定义身份验证 443 mqtt

 

 

证书身份验证

您需要在 Amazon AWS 控制台中创建证书,并设置其存储路径。

 

使用 OpenSSL 作为 IOHandler 时,必须在以下路径中设置证书

 

Certificate.Enabled:如果要使用证书,请设置为 True。

Certificate.CertFile:X.509 客户端证书的路径。

Certificate.KeyFile:X.509 客户端密钥文件的路径。

 

使用 SChannel 作为 IOHandler 时,请先将 PEM 证书 + 私钥转换为 PFX 证书。这需要 OpenSSL 二进制文件:

 


openssl pkcs12 -inkey 884ccf73ff-private.pem.key -in 884ccf73ff-certificate.pem.crt -export -out 884ccf73ff-certificate.pfx

然后设置以下路径(无需设置密钥文件,因为它已包含在证书中)。

 

Certificate.Enabled:如果要使用证书,请设置为 True。

Certificate.CertFile: PFX 证书的路径

 

SignatureV4 身份验证

您需要在 Amazon AWS 控制台中创建一个用户,并保存 Access 和 Secret 密钥,这些密钥将用于签署 WebSocket 请求。

 

SignatureV4.Enabled:若要使用此类型的身份验证,请将其设为 True。

SignatureV4.Region:您的设备所在的区域(例如:us-east-1)。

SignatureV4.AccessKey:在 Amazon 控制台中创建的访问密钥或作为临时凭证获取的访问密钥。

SignatureV4.SecretKey:在 Amazon 控制台中创建的密钥,或作为临时凭据获取

SignatureV4.SessionToken:(条件性)如果您正在使用临时安全凭证,请在此处设置安全令牌。

OpenSSL_Options:OpenSSL 库的配置。

APIVersion:允许定义将使用哪个 OpenSSL API 版本。

oslAPI_1_0: 使用 API 1.0 OpenSSL,这是 Indy 支持的最新版本

oslAPI_1_1:使用 OpenSSL API 1.1,需要我们定制的 Indy 库,并支持 OpenSSL 1.1.1 库(含 TLS 1.3 支持)。

oslAPI_3_0:使用 API 3.0 OpenSSL,需要我们的自定义 Indy 库,并允许使用 OpenSSL 3.0.0 库(支持 TLS 1.3)。

LibPath: 在此处您可以配置 openSSL 库的位置

oslpNone:这是默认值,OpenSSL 库应与二进制文件位于同一文件夹中,或位于已知路径中。

oslpDefaultFolder:自动设置所有 IDE 版本应查找 OpenSSL 库的路径。

oslpCustomFolder: 如果选择此选项,请在属性 LibPathCustom 中定义完整路径。

LibPathCustom:当 LibPath = oslpCustomFolder 时,在此处定义 openSSL 库所在的完整路径。

UnixSymLinks: 启用或禁用在 Unix 系统下加载符号链接(默认启用,OSX64 除外):

oslsSymLinksDefault: 默认情况下启用,但在 OSX64 下除外(MacOS Monterey 之后,不带版本号尝试加载库会失败)。

oslsSymLinksLoadFirst:先加载符号链接,然后再尝试加载版本库。

oslsSymLinksLoad: 在尝试加载版本库后加载符号链接。

oslsSymLinksDontLoad:不加载符号链接。

 

*SignatureV4 需要 Indy 10.5.7 及以上版本。

自定义身份验证

自定义认证使您能够通过授权器资源定义客户端的身份验证和授权方式。设备可通过请求头字段或查询参数(用于基于 WebSockets 的 MQTT 协议)传递凭证,或在 MQTT CONNECT 消息的用户名和密码字段中传递(用于 MQTT 及基于 WebSockets 的 MQTT 协议)。

 

CustomAuthentication.Enabled:若要使用此类身份验证,请设置为 True。

CustomAuthentication.Parameters:在此设置将传递给服务器的查询参数(默认为 /mqtt)

CustomAuthentication.Headers: 在此处可以设置自定义头字段。

CustomAuthentication.WebSockets:若设置为 true,连接将通过 WebSocket 协议工作,否则将通过纯 TCP 工作。

 

MQTTAuthentication.Enabled:如果您需要在 MQTT 连接中传递用户名/密码,请启用此属性。

MQTTAuthentication.Username:MQTT 连接的用户名

MQTTAuthentication.Password:MQTT 连接的密钥。

 

 

客户端可以选择发送 ClientId 以标识客户端连接,其他客户端可以订阅以在每次该客户端连接、订阅、断开连接时收到通知。

 

授权

如果无法使用 8883 端口并使用 TCP 作为传输协议(这是默认设置),Amazon 将通过"AWS IoT Core 策略"向客户端和订阅提供或拒绝授权。您很可能需要授权您的客户端 id。

登录您的 Amazon AWS 控制台,进入 IoT Core,访问菜单 "Secure/Policies",选择附加到您 IoT Thing 的策略,并在末尾查看连接的配置方式。示例:

 

{

"Effect": "Allow",

"Action": [

"iot:Connect"

],

"Resource": [

"arn:aws:iot:us-east-1:222178873557:client/sdk-java",

"arn:aws:iot:us-east-1:222178873557:client/basicPubSub",

"arn:aws:iot:us-east-1:222178873557:client/sdk-nodejs-*"

]

}

 

此配置表示只允许 ID 为 sdk-java、basicPubSub 和 sdk-nodejs-* 的客户端连接。请相应地更改并重试。

如果仍然无法连接,请启用日志并在 CloudWatch 中检查无法连接的原因。

 

其他属性

 

MQTTHeartBeat:若启用,尝试通过每 x 秒发送一次 ping 来保持 MQTT 连接活跃。

 

Interval:每次 ping 之间的秒数。

 

MQTTAuthentication:如果启用,将在 MQTT 连接中包含用户名和密码

 

UserName:用户名称

密码:密钥字符串

 

WatchDog:如果启用,当检测到意外断开连接时,将尝试自动重新连接到服务器。

 

Interval:两次重连尝试之间的秒数。

 

Attempts: 最大重连尝试次数;零表示无限制。

 

LogFile:若启用,将套接字消息保存到日志文件(对调试有用)。如果从多个线程访问,日志文件的访问不是线程安全的。

 

Enabled:如果启用,每次套接字接收和发送消息时,都会将其保存到文件中。

 

FileName:文件的完整路径。

 

实现

 

Amazon MQTT 实现基于 MQTT 3.1.1 版本,但在以下方面偏离了规范:

 

 

连接到 AWS IoT

首先,您需要登录 AWS 控制台,注册一个新设备并为该设备创建 X.509 证书。完成后,您可以创建一个新的 TsgcIoTAmazon_MQTT_Client 并连接到 AWS IoT 服务器。例如:

 


oClient := TsgcIoTAmazon_MQTT_Client.Create(nil);
oClient.Amazon.Endpoint := 'a2ohgdjqitsmij-ats.iot.us-west-2.amazonaws.com';
oClient.Amazon.ClientId := 'sgcWebSockets';
oClient.Certificate.CertFile := 'amazon-certificate.pem.crt';
oClient.Certificate.KeyFile := 'amazon-private.pem.key';
oClient.OnMQTTConnect := OnMQTTConnectEvent;
oClient.Active := True;
 
procedure OnMQTTConnect(Connection: TsgcWSConnection; const Session: Boolean; const ReturnCode: TmqttConnReturnCode);
begin
  ShowMessage('Connected to AWS');
end;

主题

消息 broker 使用主题将消息从发布客户端路由到订阅客户端。正斜杠(/)用于分隔主题层次结构。下表列出了订阅时可在主题过滤器中使用的通配符。# 必须是订阅主题中的最后一个字符。它作为通配符匹配当前树和所有子树。

例如,订阅 Sensor/# 将接收发布至 Sensor/、Sensor/temp、Sensor/temp/room1 的消息,但不接收发布至 Sensor 的消息。

+ 精确匹配主题层级中的一个元素。例如,订阅 Sensor/+/room1 将接收发布到 Sensor/temp/room1、Sensor/moisture/room1 等主题的消息。

 


oClient := TsgcIoTAmazon_MQTT_Client.Create(nil);
...
oClient.OnSubscribe := OnSubscribeEvent;
 
vPacketIdentifier := oClient.Subscribe('Sensor/moisture/room1');
  
procedure OnMQTTSubscribe(Connection: TsgcWSConnection; aPacketIdentifier: Word; aCodes: TsgcWSSUBACKS);
begin
  if vPacketIdentifier = aPacketIdentifier then
    ShowMessage('Subscribed to topic Sensor/moisture/room1'); 
end;
 
// Client, can send a message using Publish method.
oClient.Publish('Sensor/moisture/room1', '{"temp"=10}');
  
// Messages received from server, are dispatched OnMQTTPublishEvent.
// For extended payload access (string, bytes or stream), use OnMQTTPublishEx.
procedure OnMQTTPublish(Connection: TsgcWSConnection; aTopic, aText: string);
begin
  DoLog('Received Message: ' + aTopic + ' ' + aText);
end;

保留主题

以下方法用于订阅/发布到保留主题。

 

Subscribe_ClientConnected(const aClientId: String):当具有指定客户端 ID 的 MQTT 客户端连接到 AWS IoT 时,AWS IoT 会向此主题发布内容

Subscribe_ClientDisconnected(const aClientId: String): 当具有指定客户端 ID 的 MQTT 客户端断开与 AWS IoT 的连接时,AWS IoT 将发布到此主题。

Subscribe_ClientSubscribed(const aClientId: String): 当具有指定客户端 ID 的 MQTT 客户端订阅 MQTT 主题时,AWS IoT 将发布到此主题。

Subscribe_ClientUnSubscribed(const aClientId: String):当具有指定客户端 ID 的 MQTT 客户端取消订阅某个 MQTT 主题时,AWS IoT 会向此主题发布消息。

 

Publish_Rule(const aRuleName, aText: String):设备或应用程序发布到此主题以直接触发规则。

 

Publish_DeleteShadow(const aThingName, aText: String):设备或应用程序发布到此主题以删除影子

Subscribe_DeleteShadow(const aThingName: String):设备或应用程序订阅此主题以删除 Shadow

Subscribe_ShadowDeleted(const aThingName: String):当影子被删除时,Device Shadow 服务向此主题发送消息

Subscribe_ShadowRejected(const aThingName: String):当删除影子的请求被拒绝时,Device Shadow 服务向此主题发送消息

Publish_ShadowGet(const aThingName, aText: String):应用程序或设备向此主题发布空消息以获取影子

Subscribe_ShadowGet(const aThingName: String):应用程序或设备订阅此主题以获取 shadow

Subscribe_ShadowGetAccepted(const aThingName: String):当成功发出获取设备影子的请求时,Device Shadow 服务向此主题发送消息

Subscribe_ShadowGetRejected(const aThingName: String):当对 Shadow 的请求被拒绝时,Device Shadow 服务向此主题发送消息

Publish_ShadowUpdate(const aThingName, aText: String):设备或应用程序向此主题发布以更新设备影子

Subscribe_ShadowUpdateAccepted(const aThingName: String):当影子更新成功完成时,Device Shadow 服务会向此主题发送消息

Subscribe_ShadowUpdateRejected(const aThingName: String):当影子更新被拒绝时,Device Shadow 服务会向此主题发送消息

Subscribe_ShadowUpdateDelta(const aThingName: String):当在影子的 reported 和 desired 部分之间检测到差异时,Device Shadow 服务向此主题发送消息

Subscribe_ShadowUpdateDocuments(const aThingName: String):每当成功更新 shadow 时,AWS IoT 将状态文档发布到此主题

 

持久会话

持久会话表示与 MQTT 消息 broker 的持续连接。当客户端使用持久会话连接到 AWS IoT 消息 broker 时,消息 broker 将保存客户端在连接期间进行的所有订阅。客户端断开连接后,消息 broker 会存储未确认的 QoS 1 消息以及发布到客户端已订阅主题的新 QoS 1 消息。当客户端重新连接到持久会话时,所有订阅将恢复,所有存储的消息将以每秒最多 10 条的速率发送给客户端。

 

通过在 OnMQTTBeforeConnect 事件中将 cleanSession 参数设置为 False,可创建 MQTT 持久会话。如果客户端没有现有会话,则创建新的持久会话;如果客户端已有会话,则恢复该会话。

 

设备需要查看 OnMQTTConnect 事件中的 Session 属性,以确定是否存在持久会话。如果 Session 为 True,则存在持久会话,并将存储的消息传递给客户端。如果 Session 为 False,则不存在持久会话,客户端必须重新订阅其主题过滤器。

 

持久会话的默认过期时间为 1 小时。过期时间从消息代理检测到客户端断开连接(MQTT 断开或超时)时开始计算。持久会话过期时间可通过标准限额增加流程延长。如果客户端在过期时间内未恢复会话,则会话将被终止,所有关联的存储消息将被丢弃。过期时间是近似值,会话可能比配置的持续时间延长最多 30 分钟(但不会提前)。

 

临时凭证

AWS IoT Core 可以使用通过 Identity Pools 获取的临时凭据,有 2 种类型的身份:

 

 

未认证

如果您使用未认证的凭证,只需在 IAM 菜单中自动创建的未认证角色上附加策略。然后配置客户端,设置由 Cognito 服务返回的 Access、Secret Key 和 Token。

以下是获取未认证凭据的 .NET 代码

 


CognitoAWSCredentials credentials = new CognitoAWSCredentials(
    "us-east-1:cc3c9c48-646d-44ef-bfd5-0c5fb2f0882f", // Identity pool ID
    Amazon.RegionEndpoint.USEast1 // Region
);
 
var identityPoolId = credentials.GetCredentialsAsync();
 
AmazonCognitoIdentityClient cognitoClient = new AmazonCognitoIdentityClient(
    credentials, // the anonymous credentials
    Amazon.RegionEndpoint.USEast1 // the Amazon Cognito region
);
 
GetIdRequest idRequest = new GetIdRequest();
idRequest.AccountId = "222178873557";
idRequest.IdentityPoolId = "us-east-1:cc3c9c48-646d-44ef-bfd5-0c5fb2f0882f";
 
GetIdResponse idResp = cognitoClient.GetId(idRequest);
 
string AccessKey = identityPoolId.Result.AccessKey;
string SecretKey = identityPoolId.Result.SecretKey;
string SessionToken = identityPoolId.Result.Token;
 
string IdentityId = idResp.IdentityId;

已认证

已认证的凭据,需要在 IAM 菜单中自动创建的已认证角色附加策略,并将用户策略附加到 AWS IoT Core 策略。

因此,在 IoT Core 策略菜单中创建新策略,每次有新用户进行身份验证时,将此策略附加到该用户。

您可以使用以下 AWS 命令附加策略或创建 lambda 函数。

 

aws iot attach-policy --policy-name PolicyName --target us-east-1:XXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX

 

 

设备预配

Fleet Provisioning 服务支持以下 MQTT API 操作:

 

 

CreateCertificateFromCsr

 

使用 CreateCertificateFromCsr 方法,将 CertificateSigningRequest 作为参数传入,以创建证书。为接收此请求的响应,请先订阅以下方法:SubscribeCreateCertificateFromCsrResponse 和 SubscribeCreateCertificateFromCsrError。

 

CreateKeysAndCertificate

 

使用 CreateKeysAndCertificate 方法创建新证书和密钥。为了接收此请求的响应,请先订阅以下方法:SubscribeCreateKeysAndCertificateResponse 和 SubscribeCreateKeysAndCertificateError

 

RegisterThing

 

使用方法 RegisterThing 注册新设备,将模板名称和 JSON 格式的负载作为参数传入。要接收对此请求的响应,请先订阅以下方法:SubscribeRegisterThingResponse 和 SubscribeRegisterThingError。