Redis Stream
Redis Stream is a new data structure introduced in Redis 5.0.
Redis Stream is primarily used for message queues (MQ, Message Queue). Redis itself has a Redis publish/subscribe (pub/sub) feature to implement message queue functionality, but it has a drawback: messages cannot be persisted. If there is a network disconnection, Redis downtime, etc., messages will be discarded.
In simple terms, publish/subscribe (pub/sub) can distribute messages but cannot record historical messages.
Redis Stream provides message persistence and master-slave replication features, allowing any client to access data at any time and remember the access position of each client, while ensuring that messages are not lost.
The structure of Redis Stream is as follows. It has a message linked list that chains all added messages together, each with a unique ID and corresponding content:
Each Stream has a unique name, which is the key in Redis, automatically created when we first append a message using the xadd command.
Diagram explanation:
- Consumer Group: A consumer group created using the XGROUP CREATE command, with multiple consumers (Consumer).
- lastdeliveredid: A cursor, each consumer group has a cursor last_delivered_id. Any consumer reading a message will move the cursor last_delivered_id forward.
- pendingids: A state variable for consumers (Consumer), used to maintain unacknowledged IDs. pendingids records messages that have been read by the client but not yet acknowledged (ack).
Message queue related commands:
- XADD - Add a message to the end
- XTRIM - Trim the stream, limit the length
- XDEL - Delete a message
- XLEN - Get the number of elements in the stream, i.e., message length
- XRANGE - Get the message list, automatically filter out deleted messages
- XREVRANGE - Get the message list in reverse order, IDs from largest to smallest
- XREAD - Get the message list in a blocking or non-blocking manner
Consumer group related commands:
- XGROUP CREATE - Create a consumer group
- XREADGROUP GROUP - Read messages in a consumer group
- XACK - Mark a message as "processed"
- XGROUP SETID - Set a new last delivered message ID for a consumer group
- XGROUP DELCONSUMER - Delete a consumer
- XGROUP DESTROY - Delete a consumer group
- XPENDING - Display information about pending messages
- XCLAIM - Transfer message ownership
- XINFO - View information about streams and consumer groups
- XINFO GROUPS - Print consumer group information
- XINFO STREAM - Print stream information
XADD
Use XADD to add a message to the queue. If the specified queue does not exist, a queue will be created. The syntax for XADD is:
XADD key ID field value [field value ...]
- key: Queue name, created if it does not exist
- ID: Message ID, we use * to indicate that Redis will generate it, it can be customized but must be ensured to be incremental.
- field value: Record.
Example
redis> XADD mystream * name Sara surname OConnor
"1601372323627-0"
redis> XADD mystream * field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis> XLEN mystream
(integer) 2
redis> XRANGE mystream - +
1) 1) "1601372323627-0"
2) 1) "name"
2) "Sara"
3) "surname"
4) "OConnor"
2) 1) "1601372323627-1"
2) 1) "field1"
2) "value1"
3) "field2"
4) "value2"
5) "field3"
6) "value3"
redis>
XTRIM
Use XTRIM to trim the stream, limiting the length. The syntax for XTRIM is:
XTRIM key MAXLEN [~] count
- key: Queue name
- MAXLEN: Length
- count: Number
Example
redis> XTRIM mystream MAXLEN 1
OK
redis> XLEN mystream
(integer) 1
redis>
127.0.0.1:6379> XADD mystream * field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:6379> XTRIM mystream MAXLEN 2
(integer) 0
127.0.0.1:6379> XRANGE mystream - +
1) 1) "1601372434568-0"
2) 1) "field1"
2) "A"
3) "field2"
4) "B"
5) "field3"
6) "C"
7) "field4"
8) "D"
127.0.0.1:6379>
redis>
XDEL
Use XDEL to delete messages with the syntax:
XDEL key ID [ID ...]
- key: queue name
- ID: message ID
Example
> XADD mystream * a 1
1538561698944-0
> XADD mystream * b 2
1538561700640-0
> XADD mystream * c 3
1538561701744-0
> XDEL mystream 1538561700640-0
(integer) 1
127.0.0.1:6379> XRANGE mystream - +
1) 1) 1538561698944-0
2) 1) "a"
2) "1"
2) 1) 1538561701744-0
2) 1) "c"
2) "3"
XLEN
Use XLEN to get the number of elements in the stream, i.e., the message length, with the syntax:
XLEN key
- key: queue name
Example
redis> XADD mystream * item 1
"1601372563177-0"
redis> XADD mystream * item 2
"1601372563178-0"
redis> XADD mystream * item 3
"1601372563178-1"
redis> XLEN mystream
(integer) 3
redis>
XRANGE
Use XRANGE to get the list of messages, automatically filtering out deleted messages, with the syntax:
XRANGE key start end [COUNT count]
- key: queue name
- start: start value,
-
indicates the minimum value - end: end value,
+
indicates the maximum value - count: number of messages
Example
redis> XADD writers * name Virginia surname Woolf
"1601372577811-0"
redis> XADD writers * name Jane surname Austen
"1601372577811-1"
redis> XADD writers * name Toni surname Morrison
"1601372577811-2"
redis> XADD writers * name Agatha surname Christie
"1601372577812-0"
redis> XADD writers * name Ngozi surname Adichie
"1601372577812-1"
redis> XLEN writers
(integer) 5
redis> XRANGE writers - + COUNT 2
1) 1) "1601372577811-0"
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) "1601372577811-1"
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
redis>
XREVRANGE
Use XREVRANGE to get the list of messages, automatically filtering out deleted messages, with the syntax:
XREVRANGE key end start [COUNT count]
- key: queue name
- end: end value,
+
indicates the maximum value - start: start value,
-
indicates the minimum value - count: number of messages
Example
redis> XADD writers * name Virginia surname Woolf
"1601372731458-0"
redis> XADD writers * name Jane surname Austen
This is a Chinese to English translation, please provide the English translation for this text. Do not provide any explanations or text apart from the translation.
Chinese: "1601372731459-0"
redis> XADD writers * name Toni surname Morrison
"1601372731459-1"
redis> XADD writers * name Agatha surname Christie
"1601372731459-2"
redis> XADD writers * name Ngozi surname Adichie
"1601372731459-3"
redis> XLEN writers
(integer) 5
redis> XREVRANGE writers + - COUNT 1
1) 1) "1601372731459-3"
2) 1) "name"
2) "Ngozi"
3) "surname"
4) "Adichie"
redis>
XREAD
Use XREAD to get a list of messages in a blocking or non-blocking manner, syntax format:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count: number of messages
- milliseconds: optional, blocking milliseconds, non-blocking mode if not set
- key: queue name
- id: message ID
Example
# Read two messages from the head of the Stream
> XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
1) 1) "mystream"
2) 1) 1) 1526984818136-0
2) 1) "duration"
2) "1532"
3) "event-id"
4) "5"
5) "user-id"
6) "7782813"
2) 1) 1526999352406-0
2) 1) "duration"
2) "812"
3) "event-id"
4) "9"
5) "user-id"
6) "388234"
2) 1) "writers"
2) 1) 1) 1526985676425-0
2) 1) "name"
2) "Virginia"
3) "surname"
4) "Woolf"
2) 1) 1526985685298-0
2) 1) "name"
2) "Jane"
3) "surname"
4) "Austen"
XGROUP CREATE
Use XGROUP CREATE to create a consumer group, syntax format:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
- key: queue name, create if it does not exist
- groupname: group name
- $: indicates starting from the tail, only accepting new messages, current Stream messages will be ignored
Start consuming from the beginning:
XGROUP CREATE mystream consumer-group-name 0-0
Start consuming from the tail:
XGROUP CREATE mystream consumer-group-name $
XREADGROUP GROUP
Use XREADGROUP GROUP to read messages in a consumer group, syntax format:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group: consumer group name
- consumer: consumer name
- count: number of messages to read
- milliseconds: blocking milliseconds
- key: queue name
- ID: message ID
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >