> However I’m not living into the illusion that I got everything right in the first release, so it will take months (or years?) of iteration to really reach the operational simplicity I’m targeting.
It's always refreshing to hear such good programmers acknowledging how hard building complex systems is.
And this from antirez who writes such excellent programms, Redis runs in several companies I've been, without ever crashing, making any trouble - most people I've met forget they have Redis running because it just works and works and works. Can't praise that piece of code high enough.
Indeed, we have had redis running in production for four years now and I can't remember a single crash happening. If something goes wrong in our stack redis is the last thing we check. Antirez should write a http web server :-)
Disque is definitely exciting, and looks like it can replace RabbitMQ, which has serious flaws in its clustering design. I'm looking forward to trying it out.
However, if some constructive criticism is permitted, I have to say that, having written distributed applications for many years, I have come to dislike the "classical" push/pop queue data model:
* Acking is a bad idea. It requires the broker to manage a lot of state, including locking and timeouts.
* Re-queuing invalidates total ordering.
* On the performance side, parallel distributed queue consumption (which also breaks total ordering) is directly at odds with this model.
* Queues as opaque objects — you can only inspect by popping the top message, and you cannot access older, dequeued messages. Fortunately, Disque allows you to read the entire queue without mutating it, but it doesn't look like you can read old messages.
* Complicated queue topologies (fanouts, dead letter queues, etc.) become a logical necessity of the strict FIFO structure. (These topologies need to be declared every time the client starts up, and introduces the possibility of schema conflicts.)
* Logical de-duping is probably not possible.
Apache Kafka gets the data model right. It wisely acknowledges that queues are linear and should stay that way: In Kafka, queues are strictly append-only logs where every consumer has a cursor to the last position it read. In this model, many of the classical concerns melt away: Acks/nacks are unnecessary (consumers simply "commit" their position); total ordering is always preserved (since the queue cannot be reordered) and parallelism is made explicit (through named partitions); de-duping is trivial, and complicated topologies are largely eliminated (AMQP-type "exchanges" that fan out to separate queues are unnecessary because multiple readers can all consume the same queue without changing it, as their position is independent of the queue); and you get to choose either at-most-once or at-least-once delivery consistency by how carefully you manage your offset.
Since logs are strictly linear, you are also given the choice of how much history to keep — all of it, if you want — which opens up some interesting use cases that are not possible with classical brokers.
Kafka isn't perfect. It's a huge pain if you're not in Java land. There are no modern, mature "high-level" client implementations for Go, Ruby or Node.js. Its reliance on the JVM and on ZooKeeper makes it fairly heavyweight, both on the server and on the consumer side (the new API for broker-stored offsets simplifies things, but non-Java clients are far behind). I would really love to have a lighter-weight, language-agnostic Kafka-like implementation without the Java baggage.
Last point: The fact that Disque calls its messages "jobs" makes me a little disappointed that it is, in fact, not a job management system. I'd love a solid, distributed job manager.
Kafka has a different data model which works for some scenarios, but not others.
We attempted to use Kafka as part of a job management system, where long-running jobs were scheduled and workers consumed partitions, but what we found was that since consumers work on partitions, a long-running task could block an entire partition's worth of work, with no way to migrate it to another partition.
Kafka works really well when the bottleneck is the broker to begin with -- if you have a lot of small, lightweight messages being passed to other systems, and the broker is having trouble keeping up. Analytics and logs are great examples where Kafka's data model works really well.
But for a smaller number of larger messages that takes seconds to minutes to process (we were using it to schedule data downloads and video processing), workers could be sitting idle while there was still plenty more work to do.
We moved to Amazon SQS for now, but I imagine we'll move to something like Disque in the future.
I would argue that job management is unrelated to messaging, at least according my loose definition of a task that:
* Has a well-defined lifetime — unstarted, running, paused, completed successfully, or failed;
* Is executed from some kind of parameterized "job specification" that describes its inputs and desired behavior;
* Has state data (e.g., completion % progress, log output, metrics, transactional continutation state for pausing/retrying);
* Can be scheduled on multiple nodes;
* Can be created and scheduled multiple times;
* Can be scheduled by priority;
* Can have its resources (RAM, CPU, I/O) constrained.
The thing is, queues are terrible at this. People continue to abuse messaging systems as a scheduler, and it always sucks.
For example, consider acking: For a long-running tasks that might take hours, do you really want to hold up the broker's internal lock for all that time? What if you restart the broker — the task is still running, now you decoupled its running state from the queue. Similarly, queues are terrible at finding out what's waiting, or what has happened. By using the queue as job state, you are coupling scheduling with execution, which is just wrong.
A queue (one that supports priorities, mind you) is, however, excellent for management commands: "Start job X", "pause job X" and so on. But you will want to maintain execution state separately from the queue. A relational database is quite good at this, though one could use a NoSQL databases, too. First, create a row representing the job: {id="job1", state="new", type="import_stuff", params="..."}. Then push a queue message {commmand="start_job", job_id="job1"}. Run a queue consumer or ten on each node. Each consumer acks the message, finds the job in the table and starts it. One could run it as a child process, or in a Docker container. One could of course use an orchestration service (like Mesos/Marathon) to run it, and monitor the service for container events and update the job table appropriately.
I would want to take it further and allow the job to update itself through API, so that it could publish metrics and log messages, as well as maintain whatever data is useful snapshotting. For example, a long-running task to import data into a database would benefit from maintaining a cursor so that you could pause/kill the job and later resume it at the point where it was.
Sure, that's actually very similar to the the system we built internally. But I don't see how Kafka as a messaging layer helps in your system, either. Presumably, the consumers won't advance their offset in their partition until the job is done, because they are not ready to start a new job. So you will still get the same blocking behavior.
In addition, I don't see how you implement priority in a system like this, since Kafka partitions are append-only. One has to process the former start_job messages before you reach the higher priority items.
Having a broker hand out items in priority to workers makes a lot more sense to me.
The job system you described is definitely something missing from the OSS world, and we'd love to open up our system for this eventually. While Disque is not that piece, I think it makes more sense than Kafka.
I didn't mean to imply that Kafka was suitable for such a system, though that's mostly because it doesn't support priority queues. You could work around this by using a fixed range of levels and one queue per priority level, though it would probably just complicate things.
My gripe with using a relational database as a priority queue is tends not to scale well. (Similarly as if you tried to use a relational database as a queue).
Once you get to the 10s of millions of messages per day (for example ~8M at priority X and 2M at priority Y), performance tends to go down the drain (due to contention), disk space bloats (due to vacumming/deletes).
I've yet to come across a good task scheduling system that will take into the account resources available and the priority of the tasks, while still usable on a largish dataset that isn't a pain to maintain.
What are the downsides with SQS if you expect a small number of large massages (well, not larger than the 4K limit I think)? The bill should be a lot cheaper than running a broker your self. Are you worried about lock-in? SQS APIs are simple enough to replicate with a REST service backed by a database for low usage scenarios.
Hello, thanks for the comment. I hope the following notes may help to make clear what are the ideas behind the design decisions used in Disque:
> Acking is a bad idea. It requires the broker to manage a lot of state, including locking and timeouts.
I think it’s much better to put the complexity on the broker than putting it on the client. It’s definitely a tradeoff so many other people may not agree. Because of ACKs, a message will be delivered again to some client forever until there is a clear proof that the client processed the message (the ACK). The client has very little room for errors. In the Kafka model, which makes sense when you want to do stream processing, the client has to handle the storage of the offset. When the offset is committed to the broker itself, then the broker becomes a store that must guarantee certain kinds of consistency, which is fairly more complex than taking state in an AP system. Given that Disque targets workloads which are not stream oriented, where the order of messages is not very important, if not for the best-effort of trying to serve first who arrived first on the average case, to put the complexity of handling the state to the client (by managing the offset), or to the broker by solving a storage and consistency problem, seems a bad idea to me.
> Re-queuing invalidates total ordering.
This is a fundamental assumption of Disque: “causal/total order of messages is overrated”. There are a few use cases where it is a very important feature to have, there are a lot of use cases where it is not. When you don’t have to guarantee order, a lot of scalability and robustness can be added to the system since it can be AP, can handle the re-delivery of messages avoiding to put complexity to the client, can survive larger network issues, does not need any strongly consistent component which can make operations harder, and so forth.
> On the performance side, parallel distributed queue consumption (which also breaks total ordering) is directly at odds with this model.
I don’t agree with this statement. Because of this data model, different producers and consumers for the same huge queue, can split the load among many nodes in a completely scalable way. But this does not prevent scaling many unrelated queues as well. Different nodes will handle different queues, and Disque has mechanisms in order to improve affinity so that producers and consumers for the same queues will try to stay to the same nodes whenever possible.
Similarly not having to solve a storage problem (to commit clients offsets) makes a system easier to scale. To store something consistently is almost always going to cost some performance.
> Queues as opaque objects — you can only inspect by popping the top message, and you cannot access older, dequeued messages. Fortunately, Disque allows you to read the entire queue without mutating it, but it doesn't look like you can read old messages.
Disque has iterators in order to solve this problem. You can iterate, with filters, the whole messages space. However note that in the Kafka model you have the problem that the queue is a linar accessible object but without clues about what was processed and not, since the broker itself (AFAIK) has no clue about what the state of a job is: to be processed, already processed, processed 10 times with errors, and so forth. Again, this is a tradeoff, I’m much more happy with more state on the broker than on the client for the use cases Disque is designed for.
* Complicated queue topologies (fanouts, dead letter queues, etc.) become a logical necessity of the strict FIFO structure. (These topologies need to be declared every time the client starts up, and introduces the possibility of schema conflicts.)
Not sure about this since as said strict FIFO is not targeted nor possible with Disque.
* Logical de-duping is probably not possible.
This is true (or not) with all the messaging systems, basically. Storing the offset in the client side (or committing it to the broker) is the same: you need to have a consistent store, somewhere, in order to make sure after crash recovery events or network partitions you are not going to process the same messages again. Actually in order to guarantee single processing you have to store the offset as part of the transaction that produces the effects of processing the message. The same can be achieved with any messaging system that guarantees the delivery of the message, if you have a CP store somewhere. The trivial case is that you use unique IDs in order to make processing idempotent, or you use to store states in the CP store in order to make out of order delivery of messages not an issue since you ignore every message which does not match the current state. This is more the matter of moving things in some place or the other. For all the cases where single processing is desirable but not extremely important, Disque “cheap” best-effort single processing can be enough and is very scalable.
> Last point: The fact that Disque calls its messages "jobs" makes me a little disappointed that it is, in fact, not a job management system. I'd love a solid, distributed job manager.
Disque is a general messaging system, but certain design characteristics are biased towards the idea that messages will be jobs. However it is opinionated about what should be and what should not be inside the broker itself. If you want more, it’s up to the client library to implement a more full fledged job processing system on top of what Disque provides.
Last note: given that I wrote Disque to provide something to the OSS world, I was not very interested in competing with something in particular: Kafka or RabbitMQ or some other message queue. Actually the less overlap the better, so my attempt was to create something different. There are things at which existing technologies will be better and maybe there will be things at which Disque will be better.
It's extremely dangerous to run in production because it will lose data [1] by design.
RabbitMQ does not have a good strategy for recovering from partitions, which happens when a node is unable to talk to its peers. Partitions can occur not just from actual network hiccups but also simply due to high CPU or I/O load or benign VM migrations.
The underlying cause is that RabbitMQ is not multi-master by default. A queue is owned by a specific node, and if you have a partitioned cluster, that queue (and related objects such as exchanges and bindings) will simply disappear from other nodes.
You can patch this deficiency by enabling "high availability" (HA), which is their term for mirrored queues. Each queue will get a designated master, and be replicated to other nodes automatically. If a partition happens, the nodes elect a node to become a new master for a mirrored queue.
Unfortunately, this opens the cluster up to conflicts. Let's say you have two nodes, A and B, with one queue, Q. You experience brief hiccup causing a partition. A and B will both assume the role of master for Q, because RabbitMQ has no quorum support. Therefore they will continue to accept messages from apps. The hiccup passes, and now both nodes see each other again. Meanwhile, apps had sent messages to both A and B, causing Q to diverge into Q^1 and Q^2.
However, RabbitMQ has no way to consolidate the two versions into a single queue. To fix this situation, either you need to reconstruct the queue manually (usually impossible from an application's point of view), or wipe it (hardly a solution in a production envirionment), or simply have RabbitMQ automatically pick a winning master and discard the other master(s). The latter strategy is called "autoheal", and will automatically pick the master which has the most messages. The previous master(s) are wiped and become slaves. This is coincidentally the only mode in which RabbitMQ can continue to run after a partition without manual intervention. Without autoheal, RabbitMQ will become unusable.
In practice, recovery has proved flaky for us. Nodes often stay partitioned even after they should be able to see each other. We have also encountered a lot of bugs — for example, bindings or exchanges disappear on some nodes but not on others, or queues are inexplicably lost, or nodes otherwise just misbehave. We're on a cloud provider which is otherwise rock solid; of all the software (databases etc.) we employ in our clusters, RabbitMQ is the only one that misbehaves. I should add that the last few minor versions have increased stability considerably, though the fundamental clustering design issue remains.
I wouldn't call this a design flaw, it just shifts the responsibility of handling conflicts to the consumer app rather than forcing you into a server-side solution. It's too application-specific what kind of partition recovery do you need. Most people don't mind to lose a message. If you don't want to lose messages (CP-mode?) just stick all clients for certain app object (AMQP tree) to consume from the same node and block until everything is healthy or fail them over in some app-specific way to ensure consistency. While other AMQP servers may handle it better for your case these things tend to be app-specifc since your app knows better than the server which trees are critical and which are not.
I would definitely call it a design flaw. After all, RabbitMQ pretends to be multi-master; but if you set up clients to treat a single node (of several) as a master, the other nodes will be reduced to being dumb backup nodes.
Also, don't forget that clients don't decide which node owns a queue — it's owned by the node where it's first created. Being consistent about always talking to the correct node, and keeping track of which node owns the master replica of any given queue, puts a complicating burden on both clients and the system administrator.
You can run RabbitMQ without any partition handling, but that means the ops staff needs to wake up in the middle of the night to handle downtime. Not to mention that there's no queue merging support — at the very least, it could be possible to merge conflicting queues where you don't particularly care about ordering.
If you look at the last paragraph, it seems aphyr has been involved in the development:
> I was not alone during the past months, while hacking with Disque and trying to figure out how to shape it, I received the help of: [...] Kyle Kingsbury, [...] Redis Labs and Pivotal, and probably more people I’m not remembering right now. Thank you for your help.
Good hint. I tried to run the initial tests Kyle (Aphyr) wrote for Disque in order to debug a problem, and after running Jepsen a few times you realize how cool it is, and you want to run your own tests. However Jepsen is a non trivial system and requires some Clojure skill, so I postponed this activity for later... resorting to simpler systems to test Disque for now. But soon or later I want to spend a few weeks at learning Jepsen and enough Clojure in order to do my tests.
However note that the value of having Kyle testing Disque is not just his ability to use Jepsen well (of course) but also the design of the tests so in order to stress the right things to find bugs. And finally the whole analysis that is performed together with the test. So regardless of the fact I'll learn Jepsen or not I hope Disque will get some official testing with Jepsen. However I understand Kyle is a single person and there is a huge queue of systems to be tested.
See the CAP theorem [0]. Disque is designed to guarantee availability and partition tolerance, which according to CAP means Disque cannot guarantee atomic consistency (the C in CAP is not quite the same as either the A and C in ACID [1]) between nodes. No atomic consistency means no guarantee of a single action to single response correspondence, which means race conditions are possible.
Currently, we have a huge background processing system built using Sidekiq backed by Redis. The biggest problem we are facing today is to run machines with lot of in-memory storage because jobs are queued to Redis and Redis works in-memory. These machines costs a lot.
I think the same issue will happen even here because this is also in-memory store and persistence works like how Redis handles it.
ZeroMQ is not a messaging broker, it's a socket abstraction. (I interpret the zero in ZeroMQ to mean "no".) So they are not comparable at all. You can build a broker with ZeroMQ, which people have done [1], in the same sense that you can build a broker on plain TCP sockets.
There are plenty, but few/none that solve the problem Disque solves. You could easily use Kafka or RabbitMQ, but Disque aims to provide simplicity rather than be full-featured. @antirez described it as solving the case where you just need a queue with very basic guarantees and good performance without all the bells and whistles that a more complex system has. Disque is (spiritually, not technically) a lot like Mongo was five years ago compared to something much heavier like Postgres. In that sense, there are few (if any) product-ready alternatives that solve the same problem.
It should also be noted that you could probably successfully use Redis for much of what Disque does (Disque is based on Redis). Disque adds nice features, though, and removes most of the features of Redis that are outside its scope.
I found sidekiq to be better than celery for my needs because it was better integrated (although less configurable). It did what it said on the box without a fuss and without me having to debate the merits of and choose a backend, etc...
We have used nsq to ship billions of log messages daily for over 2 years. It is very sensible operationly and easy to configure to the level of robustness your use may have. It does not allow seeking into the queue like Kafka, but we have never needed that capability, and we found it easier to design around this rather than take on the operational burden.
It seems like Disque is a "simplified" Kafka, or a more vertically purposed Redis. It seems that it differs from RabbitMQ significantly because rabbit requires a queue to push the messages in, while Disque allows jobs to be pushed independently of consumers being setup.
Kafka is arguably simpler, at least in terms of data model, than Disque. Disque needs to handle acking and queue mutation and things like lock timeouts and dead lettering, all of which complicates its queue structure and API. Kafka queues (partitions, to be precise) are append-only, which simplifies a lot of things.
It's always refreshing to hear such good programmers acknowledging how hard building complex systems is.