-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Nats stream messages grows without deleting after upgrade from 2.10.24 to 2.10.25 #6419
Comments
Can you please also provide the |
|
The consumer info you posted looks quite behind the stream sequence numbers, and there are two other consumers. Are all of the consumers in use or are there stale/unused consumers on that stream? |
No all consumers all are active. I have three consumers on that ingress stream. If I restart the streaming app as you can see the messages are prcosessed and the consumers falls back again. If I rollback nats to 2.10.24 then the behaviour is as expected. |
It seems that's a regression from early versions. Isn't it ? #5148 |
The linked issue mentions sourcing, do you also use sourcing? If so, could you share the stream info of both streams? Have tried to reproduce it with the following commands but that works as expected: nats str add ingress --subjects='ingress.>' --retention=interest --max-bytes=1000MB --max-age=30d --max-consumers=5 --defaults
nats con add ingress consumer --target=consumeringress --filter=ingress.logs --deliver=all --deliver-group=consumeringress --ack=explicit --wait=10s --heartbeat=3s --flow-control --defaults
nats req 'ingress.logs' ''
nats con sub ingress consumer Could you possibly share a reproducible example? Trying to find out what's specific about this setup. To confirm, the interest-based stream using a push consumer even though a message is acked it's not removed by the stream on 2.10.25, but it is removed on 2.10.24? |
Tha's how I a subsribe : for key, definition := range nd.consumerConfigurations {
logger.Info().Msgf("Start subscription for consumer %s at stream %s and subject %s", key, definition.StreamName, definition.ConsumerConfiguration.FilterSubject)
_, exists := nd.streamConfigurations[definition.StreamName]
if !exists {
return errors.New(fmt.Sprintf("No stream configuration found for client configuration %+v ", definition))
}
subOpts := []nats.SubOpt{
nats.BindStream(definition.StreamName),
nats.Durable(definition.ConsumerConfiguration.Durable),
nats.ManualAck(), // Control the ack inProgress and nack self
}
sub, err := js.QueueSubscribe(definition.ConsumerConfiguration.FilterSubject, definition.ConsumerConfiguration.DeliverGroup, definition.MsgHandler, subOpts...)
if err != nil {
logger.Error().Err(err).Msgf("QueueSubscribe to %s failed", definition.ConsumerConfiguration.Name)
return err
}
nd.subscriptions[key] = sub
} The strange thing is that I have localy no issue with that upgrade only on prod. The whole client application is hosted under: https://github.com/suikast42/logunifier The iteresing part is : https://github.com/suikast42/logunifier/blob/master/internal/bootstrap/natsconection.go |
Tried out running the application against a single node and a R5 cluster and it does not reproduce thus far. Will need more details to track this down. What does your prod environment look like? |
I am runing docker ( deployed with hashicrop nomad ) the nats:2.10.24-alpine image and update it to nats:2.10.25-alpine |
@suikast42 possible to share the result of |
Local env with no issues: {
"server": {
"name": "observability.nats[0]",
"host": "0.0.0.0",
"id": "NAMLERFGVL6QQNMZUKPUJRUEGP75V3QPJXIYXD2RLEOUOYVE3YTGOAVA",
"ver": "2.10.25",
"jetstream": true,
"flags": 3,
"seq": 40,
"time": "2025-01-29T13:59:30.092788542Z"
},
"data": {
"server_id": "NAMLERFGVL6QQNMZUKPUJRUEGP75V3QPJXIYXD2RLEOUOYVE3YTGOAVA",
"now": "2025-01-29T13:59:30.092725354Z",
"config": {
"max_memory": 2000000000,
"max_storage": 10000000000,
"store_dir": "/data/jetstream/jetstream",
"sync_interval": 120000000000,
"compress_ok": true
},
"memory": 0,
"storage": 0,
"reserved_memory": 0,
"reserved_storage": 2097152000,
"accounts": 1,
"ha_assets": 0,
"api": {
"total": 20,
"errors": 0
},
"streams": 2,
"consumers": 4,
"messages": 0,
"bytes": 0,
"account_details": [
{
"name": "$G",
"id": "$G",
"memory": 0,
"storage": 0,
"reserved_memory": 18446744073709551615,
"reserved_storage": 18446744073709551615,
"accounts": 0,
"ha_assets": 0,
"api": {
"total": 20,
"errors": 0
},
"stream_detail": [
{
"name": "LogStreamEgress",
"created": "2025-01-29T12:33:05.699120878Z",
"cluster": {
"leader": "observability.nats[0]"
},
"config": {
"name": "LogStreamEgress",
"description": "Egress stream that contains ecs logs in json format for ship in various sinks",
"subjects": [
"egress.logs.ecs"
],
"retention": "interest",
"max_consumers": 5,
"max_msgs": -1,
"max_bytes": 1048576000,
"max_age": 2592000000000000,
"max_msgs_per_subject": -1,
"max_msg_size": -1,
"discard": "old",
"storage": "file",
"num_replicas": 1,
"duplicate_window": 300000000000,
"compression": "none",
"allow_direct": false,
"mirror_direct": false,
"sealed": false,
"deny_delete": false,
"deny_purge": false,
"allow_rollup_hdrs": false,
"consumer_limits": {}
},
"state": {
"messages": 0,
"bytes": 0,
"first_seq": 48266487,
"first_ts": "1970-01-01T00:00:00Z",
"last_seq": 48266486,
"last_ts": "2025-01-29T13:59:29.994578735Z",
"consumer_count": 1
},
"consumer_detail": [
{
"stream_name": "LogStreamEgress",
"name": "ConsumerEgressLokiShipper",
"created": "2025-01-29T12:33:05.699774787Z",
"config": {
"durable_name": "ConsumerEgressLokiShipper",
"name": "ConsumerEgressLokiShipper",
"description": "Push consumer for subject egress.logs.ecs",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 10000000000,
"max_deliver": -1,
"filter_subject": "egress.logs.ecs",
"replay_policy": "instant",
"max_ack_pending": 1024,
"idle_heartbeat": 3000000000,
"flow_control": true,
"deliver_subject": "ConsumerEgressLokiShipper_Group",
"deliver_group": "ConsumerEgressLokiShipper",
"inactive_threshold": 86400000000000,
"num_replicas": 1
},
"delivered": {
"consumer_seq": 48264280,
"stream_seq": 48266486,
"last_active": "2025-01-29T13:59:29.995085866Z"
},
"ack_floor": {
"consumer_seq": 48264280,
"stream_seq": 48266486,
"last_active": "2025-01-29T13:59:29.995140097Z"
},
"num_ack_pending": 0,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 0,
"push_bound": true,
"ts": "2025-01-29T13:59:30.092771423Z"
}
]
},
{
"name": "LogStreamIngress",
"created": "2025-01-29T12:33:05.699445727Z",
"cluster": {
"leader": "observability.nats[0]"
},
"config": {
"name": "LogStreamIngress",
"description": "Ingress stream for unify and enrich logs from various formats to ecs",
"subjects": [
"ingress.logs.journald",
"ingress.logs.test",
"ingress.logs.ecs"
],
"retention": "interest",
"max_consumers": 5,
"max_msgs": -1,
"max_bytes": 1048576000,
"max_age": 2592000000000000,
"max_msgs_per_subject": -1,
"max_msg_size": -1,
"discard": "old",
"storage": "file",
"num_replicas": 1,
"duplicate_window": 300000000000,
"compression": "none",
"allow_direct": false,
"mirror_direct": false,
"sealed": false,
"deny_delete": false,
"deny_purge": false,
"allow_rollup_hdrs": false,
"consumer_limits": {}
},
"state": {
"messages": 0,
"bytes": 0,
"first_seq": 51466358,
"first_ts": "1970-01-01T00:00:00Z",
"last_seq": 51466357,
"last_ts": "2025-01-29T13:59:29.993866671Z",
"consumer_count": 3
},
"consumer_detail": [
{
"stream_name": "LogStreamIngress",
"name": "ConsumerIngressEcsNative",
"created": "2025-01-29T12:33:05.699931996Z",
"config": {
"durable_name": "ConsumerIngressEcsNative",
"name": "ConsumerIngressEcsNative",
"description": "Push consumer for subject ingress.logs.ecs",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 10000000000,
"max_deliver": -1,
"filter_subject": "ingress.logs.ecs",
"replay_policy": "instant",
"max_ack_pending": 1024,
"idle_heartbeat": 3000000000,
"flow_control": true,
"deliver_subject": "ConsumerIngressEcsNative_Group",
"deliver_group": "ConsumerIngressEcsNative",
"inactive_threshold": 86400000000000,
"num_replicas": 1
},
"delivered": {
"consumer_seq": 24966541,
"stream_seq": 51466348,
"last_active": "2025-01-29T13:59:29.666221021Z"
},
"ack_floor": {
"consumer_seq": 24966541,
"stream_seq": 51466348,
"last_active": "2025-01-29T13:59:29.666383543Z"
},
"num_ack_pending": 0,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 0,
"push_bound": true,
"ts": "2025-01-29T13:59:30.092774174Z"
},
{
"stream_name": "LogStreamIngress",
"name": "ConsumerIngressJournalD",
"created": "2025-01-29T12:33:05.700037262Z",
"config": {
"durable_name": "ConsumerIngressJournalD",
"name": "ConsumerIngressJournalD",
"description": "Push consumer for subject ingress.logs.journald",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 10000000000,
"max_deliver": -1,
"filter_subject": "ingress.logs.journald",
"replay_policy": "instant",
"max_ack_pending": 1024,
"idle_heartbeat": 3000000000,
"flow_control": true,
"deliver_subject": "ConsumerIngressJournalD_Group",
"deliver_group": "ConsumerIngressJournalD",
"inactive_threshold": 86400000000000,
"num_replicas": 1
},
"delivered": {
"consumer_seq": 23298507,
"stream_seq": 51466357,
"last_active": "2025-01-29T13:59:29.994645962Z"
},
"ack_floor": {
"consumer_seq": 23298507,
"stream_seq": 51466357,
"last_active": "2025-01-29T13:59:29.994836728Z"
},
"num_ack_pending": 0,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 0,
"push_bound": true,
"ts": "2025-01-29T13:59:30.092774852Z"
},
{
"stream_name": "LogStreamIngress",
"name": "ConsumerIngressTest",
"created": "2025-01-29T12:33:05.700127728Z",
"config": {
"durable_name": "ConsumerIngressTest",
"name": "ConsumerIngressTest",
"description": "Push consumer for subject ingress.logs.test",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 10000000000,
"max_deliver": -1,
"filter_subject": "ingress.logs.test",
"replay_policy": "instant",
"max_ack_pending": 1024,
"idle_heartbeat": 3000000000,
"flow_control": true,
"deliver_subject": "ConsumerIngressTest_Group",
"deliver_group": "ConsumerIngressTest",
"inactive_threshold": 86400000000000,
"num_replicas": 1
},
"delivered": {
"consumer_seq": 0,
"stream_seq": 51466345
},
"ack_floor": {
"consumer_seq": 0,
"stream_seq": 0
},
"num_ack_pending": 0,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 0,
"push_bound": true,
"ts": "2025-01-29T13:59:30.092775291Z"
}
]
}
]
}
]
}
} Prod env with issues: {
"server": {
"name": "observability.nats[0]",
"host": "0.0.0.0",
"id": "NBRLLW3BROHFOJC2JISUHIDES6DM5HK35QBH6Q5ZYZFPCXOSPYWUPCEO",
"ver": "2.10.25",
"jetstream": true,
"flags": 3,
"seq": 84,
"time": "2025-01-29T14:15:12.79328397Z"
},
"data": {
"server_id": "NBRLLW3BROHFOJC2JISUHIDES6DM5HK35QBH6Q5ZYZFPCXOSPYWUPCEO",
"now": "2025-01-29T14:15:12.793206353Z",
"config": {
"max_memory": 2000000000,
"max_storage": 10000000000,
"store_dir": "/data/jetstream/jetstream",
"sync_interval": 120000000000,
"compress_ok": true
},
"memory": 0,
"storage": 1621013,
"reserved_memory": 0,
"reserved_storage": 2097152000,
"accounts": 1,
"ha_assets": 0,
"api": {
"total": 21,
"errors": 0
},
"streams": 2,
"consumers": 4,
"messages": 974,
"bytes": 1621013,
"account_details": [
{
"name": "$G",
"id": "$G",
"memory": 0,
"storage": 1621013,
"reserved_memory": 18446744073709552000,
"reserved_storage": 18446744073709552000,
"accounts": 0,
"ha_assets": 0,
"api": {
"total": 21,
"errors": 0
},
"stream_detail": [
{
"name": "LogStreamEgress",
"created": "2025-01-29T11:05:00.286927073Z",
"cluster": {
"leader": "observability.nats[0]"
},
"config": {
"name": "LogStreamEgress",
"description": "Egress stream that contains ecs logs in json format for ship in various sinks",
"subjects": [
"egress.logs.ecs"
],
"retention": "interest",
"max_consumers": 5,
"max_msgs": -1,
"max_bytes": 1048576000,
"max_age": 2592000000000000,
"max_msgs_per_subject": -1,
"max_msg_size": -1,
"discard": "old",
"storage": "file",
"num_replicas": 1,
"duplicate_window": 300000000000,
"compression": "none",
"allow_direct": false,
"mirror_direct": false,
"sealed": false,
"deny_delete": false,
"deny_purge": false,
"allow_rollup_hdrs": false,
"consumer_limits": {}
},
"state": {
"messages": 0,
"bytes": 0,
"first_seq": 8752859,
"first_ts": "1970-01-01T00:00:00Z",
"last_seq": 8752858,
"last_ts": "2025-01-29T14:15:12.686087984Z",
"consumer_count": 1
},
"consumer_detail": [
{
"stream_name": "LogStreamEgress",
"name": "ConsumerEgressLokiShipper",
"created": "2025-01-29T11:05:00.289711657Z",
"config": {
"durable_name": "ConsumerEgressLokiShipper",
"name": "ConsumerEgressLokiShipper",
"description": "Push consumer for subject egress.logs.ecs",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 10000000000,
"max_deliver": -1,
"filter_subject": "egress.logs.ecs",
"replay_policy": "instant",
"max_ack_pending": 1024,
"idle_heartbeat": 3000000000,
"flow_control": true,
"deliver_subject": "ConsumerEgressLokiShipper_Group",
"deliver_group": "ConsumerEgressLokiShipper",
"inactive_threshold": 86400000000000,
"num_replicas": 1
},
"delivered": {
"consumer_seq": 8754816,
"stream_seq": 8752858,
"last_active": "2025-01-29T14:15:12.688945372Z"
},
"ack_floor": {
"consumer_seq": 8754816,
"stream_seq": 8752858,
"last_active": "2025-01-29T14:15:12.693099292Z"
},
"num_ack_pending": 0,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 0,
"push_bound": true,
"ts": "2025-01-29T14:15:12.79325584Z"
}
]
},
{
"name": "LogStreamIngress",
"created": "2025-01-29T11:05:00.288058897Z",
"cluster": {
"leader": "observability.nats[0]"
},
"config": {
"name": "LogStreamIngress",
"description": "Ingress stream for unify and enrich logs from various formats to ecs",
"subjects": [
"ingress.logs.journald",
"ingress.logs.test",
"ingress.logs.ecs"
],
"retention": "interest",
"max_consumers": 5,
"max_msgs": -1,
"max_bytes": 1048576000,
"max_age": 2592000000000000,
"max_msgs_per_subject": -1,
"max_msg_size": -1,
"discard": "old",
"storage": "file",
"num_replicas": 1,
"duplicate_window": 300000000000,
"compression": "none",
"allow_direct": false,
"mirror_direct": false,
"sealed": false,
"deny_delete": false,
"deny_purge": false,
"allow_rollup_hdrs": false,
"consumer_limits": {}
},
"state": {
"messages": 974,
"bytes": 1621013,
"first_seq": 9154790,
"first_ts": "2025-01-29T13:27:20.314603479Z",
"last_seq": 9451518,
"last_ts": "2025-01-29T14:15:12.683601078Z",
"num_subjects": 2,
"num_deleted": 295755,
"consumer_count": 3
},
"consumer_detail": [
{
"stream_name": "LogStreamIngress",
"name": "ConsumerIngressEcsNative",
"created": "2025-01-29T11:05:00.290649711Z",
"config": {
"durable_name": "ConsumerIngressEcsNative",
"name": "ConsumerIngressEcsNative",
"description": "Push consumer for subject ingress.logs.ecs",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 10000000000,
"max_deliver": -1,
"filter_subject": "ingress.logs.ecs",
"replay_policy": "instant",
"max_ack_pending": 1024,
"idle_heartbeat": 3000000000,
"flow_control": true,
"deliver_subject": "ConsumerIngressEcsNative_Group",
"deliver_group": "ConsumerIngressEcsNative",
"inactive_threshold": 86400000000000,
"num_replicas": 1
},
"delivered": {
"consumer_seq": 4818035,
"stream_seq": 9451491,
"last_active": "2025-01-29T14:15:12.36590658Z"
},
"ack_floor": {
"consumer_seq": 4818035,
"stream_seq": 9451491,
"last_active": "2025-01-29T14:15:12.366725033Z"
},
"num_ack_pending": 0,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 0,
"push_bound": true,
"ts": "2025-01-29T14:15:12.79326256Z"
},
{
"stream_name": "LogStreamIngress",
"name": "ConsumerIngressJournalD",
"created": "2025-01-29T11:05:00.291230711Z",
"config": {
"durable_name": "ConsumerIngressJournalD",
"name": "ConsumerIngressJournalD",
"description": "Push consumer for subject ingress.logs.journald",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 10000000000,
"max_deliver": -1,
"filter_subject": "ingress.logs.journald",
"replay_policy": "instant",
"max_ack_pending": 1024,
"idle_heartbeat": 3000000000,
"flow_control": true,
"deliver_subject": "ConsumerIngressJournalD_Group",
"deliver_group": "ConsumerIngressJournalD",
"inactive_threshold": 86400000000000,
"num_replicas": 1
},
"delivered": {
"consumer_seq": 4018282,
"stream_seq": 9451518,
"last_active": "2025-01-29T14:15:12.685839497Z"
},
"ack_floor": {
"consumer_seq": 4018282,
"stream_seq": 9451518,
"last_active": "2025-01-29T14:15:12.689595533Z"
},
"num_ack_pending": 0,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 0,
"push_bound": true,
"ts": "2025-01-29T14:15:12.793264397Z"
},
{
"stream_name": "LogStreamIngress",
"name": "ConsumerIngressTest",
"created": "2025-01-29T11:05:00.291632609Z",
"config": {
"durable_name": "ConsumerIngressTest",
"name": "ConsumerIngressTest",
"description": "Push consumer for subject ingress.logs.test",
"deliver_policy": "all",
"ack_policy": "explicit",
"ack_wait": 10000000000,
"max_deliver": -1,
"filter_subject": "ingress.logs.test",
"replay_policy": "instant",
"max_ack_pending": 1024,
"idle_heartbeat": 3000000000,
"flow_control": true,
"deliver_subject": "ConsumerIngressTest_Group",
"deliver_group": "ConsumerIngressTest",
"inactive_threshold": 86400000000000,
"num_replicas": 1
},
"delivered": {
"consumer_seq": 0,
"stream_seq": 9451490
},
"ack_floor": {
"consumer_seq": 0,
"stream_seq": 0
},
"num_ack_pending": 0,
"num_redelivered": 0,
"num_waiting": 0,
"num_pending": 0,
"push_bound": true,
"ts": "2025-01-29T14:15:12.793265944Z"
}
]
}
]
}
]
}
} |
Not sure yet how this can be reproduced and may be specific to something in the environment. Could you maybe create backups of the streams and restore them on v2.10.25 in your prod env to see if that also still reproduces or not? |
Will do that as soon as possible . |
Observed behavior
Consumer config:
Nats conf
Expected behavior
That are the metrics of 2.10.24
After upgrade to 2.0.25
Server and client version
2.10.24 -> 2.10.25
Host environment
No response
Steps to reproduce
No response
The text was updated successfully, but these errors were encountered: