The documentation you are viewing is for Dapr v1.10 which is an older version of Dapr. For up-to-date documentation, see the latest version.
RocketMQ
Component format
To setup RocketMQ pubsub, create a component of type pubsub.rocketmq
. See this guide on how to create and apply a pubsub configuration.
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: rocketmq-pubsub
spec:
type: pubsub.rocketmq
version: v1
metadata:
- name: instanceName
value: dapr-rocketmq-test
- name: consumerGroup
value: dapr-rocketmq-test-g-c
- name: producerGroup
value: dapr-rocketmq-test-g-p
- name: nameSpace
value: dapr-test
- name: nameServer
value: "127.0.0.1:9876,127.0.0.2:9876"
- name: retries
value: 3
- name: consumerModel
value: "clustering"
- name: consumeOrderly
value: false
Warning
The above example uses secrets as plain strings. It is recommended to use a secret store for the secrets as described here.Spec metadata fields
Field | Required | Details | default | Example |
---|---|---|---|---|
instanceName | N | Instance name | time.Now().String() |
dapr-rocketmq-test |
consumerGroup | N | Consumer group name. Recommend. If producerGroup is null ,groupName is used. |
dapr-rocketmq-test-g-c |
|
producerGroup (consumerID) | N | Producer group name. Recommended. If producerGroup is null ,consumerID is used. If consumerID also is null, groupName is used. |
dapr-rocketmq-test-g-p |
|
groupName | N | Consumer/Producer group name. Depreciated. | dapr-rocketmq-test-g |
|
nameSpace | N | RocketMQ namespace | dapr-rocketmq |
|
nameServerDomain | N | RocketMQ name server domain | https://my-app.net:8080/nsaddr |
|
nameServer | N | RocketMQ name server, separated by “,” or “;” | 127.0.0.1:9876;127.0.0.2:9877,127.0.0.3:9877 |
|
accessKey | N | Access Key (Username) | "admin" |
|
secretKey | N | Secret Key (Password) | "password" |
|
securityToken | N | Security Token | ||
retries | N | Number of retries to send a message to broker | 3 |
3 |
producerQueueSelector (queueSelector) | N | Producer Queue selector. There are five implementations of queue selector: hash , random , manual , roundRobin , dapr . |
dapr |
hash |
consumerModel | N | Message model that defines how messages are delivered to each consumer client. RocketMQ supports two message models: clustering and broadcasting . |
clustering |
broadcasting , clustering |
fromWhere (consumeFromWhere) | N | Consuming point on consumer booting. There are three consuming points: CONSUME_FROM_LAST_OFFSET , CONSUME_FROM_FIRST_OFFSET , CONSUME_FROM_TIMESTAMP |
CONSUME_FROM_LAST_OFFSET |
CONSUME_FROM_LAST_OFFSET |
consumeOrderly | N | Determines if it’s an ordered message using FIFO order. | false |
false |
consumeMessageBatchMaxSize | N | Batch consumption size out of range [1, 1024] |
512 |
10 |
maxReconsumeTimes | N | Max re-consume times. -1 means 16 times. If messages are re-consumed more than {@link maxReconsumeTimes} before success, they’ll be directed to a deletion queue. |
Orderly message is MaxInt32 ; Concurrently message is 16 |
16 |
autoCommit | N | Enable auto commit | true |
false |
pullInterval | N | Message pull interval | 100 |
100 |
pullBatchSize | N | The number of messages pulled from the broker at a time. If pullBatchSize is null , use ConsumerBatchSize . pullBatchSize out of range [1, 1024] |
32 |
10 |
content-type | N | Message content type. | "text/plain" |
"application/cloudevents+json; charset=utf-8" , "application/octet-stream" |
logLevel | N | Log level | warn |
info |
sendTimeOut | N | Send message timeout to connect RocketMQ’s broker, measured in nanoseconds. Deprecated. | 3 seconds | 10000000000 |
sendTimeOutSec | N | Timeout duration for publishing a message in seconds. If sendTimeOutSec is null , sendTimeOut is used. |
3 seconds | 3 |
mspProperties | N | The RocketMQ message properties in this collection are passed to the APP in Data Separate multiple properties with “,” | key,mkey |
For backwards-compatibility reasons, the following values in the metadata are supported, although their use is discouraged.
Field (supported but deprecated) | Required | Details | Example |
---|---|---|---|
groupName | N | Producer group name for RocketMQ publishers | "my_unique_group_name" |
sendTimeOut | N | Timeout duration for publishing a message in nanoseconds | 0 |
consumerBatchSize | N | The number of messages pulled from the broker at a time | 32 |
Setup RocketMQ
See https://rocketmq.apache.org/docs/quick-start/ to setup a local RocketMQ instance.
Per-call metadata fields
Partition Key
When invoking the RocketMQ pub/sub, it’s possible to provide an optional partition key by using the metadata
query param in the request url.
You need to specify rocketmq-tag
, "rocketmq-key"
, rocketmq-shardingkey
, rocketmq-queue
in metadata
Example:
curl -X POST http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-tag=?&metadata.rocketmq-key=?&metadata.rocketmq-shardingkey=key&metadata.rocketmq-queue=1 \
-H "Content-Type: application/json" \
-d '{
"data": {
"message": "Hi"
}
}'
QueueSelector
The RocketMQ component contains a total of five queue selectors. The RocketMQ client provides the following queue selectors:
HashQueueSelector
RandomQueueSelector
RoundRobinQueueSelector
ManualQueueSelector
To learn more about these RocketMQ client queue selectors, read the RocketMQ documentation.
The Dapr RocketMQ component implements the following queue selector:
DaprQueueSelector
This article focuses on the design of DaprQueueSelector
.
DaprQueueSelector
DaprQueueSelector
integrates three queue selectors:
HashQueueSelector
RoundRobinQueueSelector
ManualQueueSelector
DaprQueueSelector
gets the queue id from the request parameter. You can set the queue id by running the following:
http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-queue=1
The ManualQueueSelector
is implemented using the method above.
Next, the DaprQueueSelector
tries to:
- Get a
ShardingKey
- Hash the
ShardingKey
to determine the queue id.
You can set the ShardingKey
by doing the following:
http://localhost:3500/v1.0/publish/myRocketMQ/myTopic?metadata.rocketmq-shardingkey=key
If the ShardingKey
does not exist, the RoundRobin
algorithm is used to determine the queue id.
Related links
- Basic schema for a Dapr component
- Pub/Sub building block
- Read this guide for instructions on configuring pub/sub components
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.