In batch processing, a file is written once and then potentially read by multiple jobs. Analogously, in streaming terminology, an event is generated once by a producer (also known as a publisher or sender), and then potentially processed by multiple con‐ sumers (subscribers or recipients) [3]. In a filesystem, a filename identifies a set of related records; in a streaming system, related events are usually grouped together into a topic or stream.
[3] Patrick Th. Eugster, Pascal A. Felber, Rachid Guerraoui, and Anne-Marie Ker‐ marrec: “The Many Faces of Publish/Subscribe,” ACM Computing Surveys, volume 35, number 2, pages 114–131, June 2003. doi:10.1145/857076.857078
A direct communication channel like a Unix pipe or TCP connection between pro‐ ducer and consumer would be a simple way of implementing a messaging system. However, most messaging systems expand on this basic model. In particular, Unix pipes and TCP connect exactly one sender with one recipient, whereas a messaging system allows multiple producer nodes to send messages to the same topic and allows multiple consumer nodes to receive messages in a topic.
这就是 publish/subscribe model
If the consumer exposes a service on the network, producers can make a direct HTTP or RPC request (see “Dataflow Through Services: REST and RPC” on page 131) to push messages to the consumer. This is the idea behind webhooks [12], a pattern in which a callback URL of one service is registered with another service, and it makes a request to that URL whenever an event occurs.
webhook 用到了callback url, 那OAuth 就是用的webhook?
Message broker
如果producer 跟 consumer 不是直接通过 network 传递信息(为了避免producer 或者 consumer 其中一个 crash 造成 message Lost)
一个普遍做法是通过 message broker 来传递消息,
message broker (also known as a message queue), which is essentially a kind of database that is optimized for handling message streams [13].
他就是一个链接producer 跟 consumer 的服务器
It runs as a server, with producers and consumers connecting to it as clients. Producers write messages to the broker, and consumers receive them by reading them from the broker.
Acknowledgments and redelivery
跟 network 一样, 要返回ack, 然后有 redelivery 机制,不过分布式本质上也是通过network来进行数据传输的,所以有类似机制并不稀奇
就跟DNS 本身是分布式存储一样,而且用到了locality(local DNS server) 来增加速度(减少latency)
Consumers may crash at any time, so it could happen that a broker delivers a mes‐ sage to a consumer but the consumer never processes it, or only partially processes it before crashing. In order to ensure that the message is not lost, message brokers use acknowledgments: a client must explicitly tell the broker when it has finished process‐ ing a message so that the broker can remove it from the queue.
Using logs for message storage
为了有permanent storage, 会采用log 形式进行存储
为了增加throughput, log 也会分布式存储
Within each partition, the broker assigns a monotonically increasing sequence num‐ ber, or offset, to every message (in Figure 11-3, the numbers in boxes are message off‐ sets). Such a sequence number makes sense because a partition is append-only, so the messages within a partition are totally ordered. There is no ordering guarantee across different partitions.
![[DDIA-Figure-11-3-分布式log 以及应用.png]]
Logs vs traditional messaging
Thus, in situations where messages may be expensive to process and you want to par‐ allelize processing on a message-by-message basis, and where message ordering is not so important, the JMS/AMQP style of message broker is preferable. On the other hand, in situations with high message throughput, where each message is fast to pro‐ cess and where message ordering is important, the log-based approach works very well.
batch process 重点在于不会影响原始数据,都是从file 里面读,然后copy and process 到新的位置。
log based message queue 也类似,他是从message broker 那边读,然后不会影响message broker 本身的log, 只是自己的offset 会增加,如果想要读昨天的message,直接用昨天的offset 就可以了
This aspect makes log-based messaging more like the batch processes of the last chapter, where derived data is clearly separated from input data through a repeatable transformation process. It allows more experimentation and easier recovery from errors and bugs, making it a good tool for integrating dataflows within an organiza‐ tion [24].
[24] Jay Kreps: “The Log: What Every Software Engineer Should Know About Real- Time Data’s Unifying Abstraction,” engineering.linkedin.com, December 16, 2013.
任何时候反向思考其实都是一个解决问题的思路
We can also go in reverse: take ideas from messag‐ ing and streams, and apply them to databases.
其实DDIA这种画图方式很值得学习,尤其是思考race condition的时候
![[DDIA-Figure-11-4.png]]
还要重新读一下state machine replication
所以一个系统里面,一定有一个DB来存原始数据,然后剩下的都是derived data storage,从这个原始(或者说source/源头)来读取任何变化,确保自己的数据是sync的
We can call the log consumers derived data systems, as discussed in the introduction to Part III: the data stored in the search index and the data warehouse is just another view onto the data in the system of record. Change data capture is a mechanism for ensuring that all changes made to the system of record are also reflected in the derived data systems so that the derived systems have an accurate copy of the data.
哈!我的理解跟Martin 一样 , CDC(Change Data Capture)就是把一个DB 作为 source/leader, 然后用一个 message queue 来让其他的 storage follow 这个source 的变化
Essentially, change data capture makes one database the leader (the one from which the changes are captured), and turns the others into followers. A log-based message broker is well suited for transporting the change events from the source database, since it preserves the ordering of messages (avoiding the reordering issue of Figure 11-2).
DDIA的reference 太强了……
LinkedIn’s Databus [25], Facebook’s Wormhole [26], and Yahoo!’s Sherpa [27] use this idea at large scale. Bottled Water implements CDC for PostgreSQL using an API that decodes the write-ahead log [28], Maxwell and Debezium do something similar for MySQL by parsing the binlog [29, 30, 31], Mongoriver reads the MongoDB oplog [32, 33], and GoldenGate provides similar facilities for Oracle [34, 35].
我觉得这点也很重要,就是我们根据需求,在不同的level abstract 出来一个model, 然后这个model 通常可以 apply 到不同的地方
The biggest difference is that event sourc‐ ing applies the idea at a different level of abstraction:
In change data capture, the application uses the database in a mutable way, updating and deleting records at will. The log of changes is extracted from the database at a low level (e.g., by parsing the replication log), which ensures that the order of writes extracted from the database matches the order in which they were actually written, avoiding the race condition in Figure 11-4. The application writing to the database does not need to be aware that CDC is occurring.
In event sourcing, the application logic is explicitly built on the basis of immuta‐ ble events that are written to an event log. In this case, the event store is append- only, and updates or deletes are discouraged or prohibited. Events are designed to reflect things that happened at the application level, rather than low-level state changes.
所以说CDC 相当于 network 里面 Link layer 实现的checksum, 而 event sourcing 是相当于 transport layer 层面实现的 checksum, 一个是更底层的实现,一个是在更高层的 abstraction 实现的
Event sourcing is a powerful technique for data modeling: from an application point of view it is more meaningful to record the user’s actions as immutable events, rather than recording the effect of those actions on a mutable database. Event sourcing makes it easier to evolve applications over time, helps with debugging by making it easier to understand after the fact why something happened, and guards against application bugs (see “Advantages of immutable events” on page 460).
这跟 network 其实超级像,通常这种实现在更高层更好,就好像你不想要 core network 有各种复杂的逻辑一样,因为 edge 更容易更新, core 只负责传输数据就好了, edge 来做各种逻辑的实现,而且更新起来要超级方便
OOD也是一样的…… open close principle 就是这样,close for modification 保证了你底层不会随意改, 而 open for extension 其实本质上就是在一个更高层的 abstraction 进行更新
所以你在设计一个系统的时候,直接用一个DB 或者 message queue 来存 event是一个更好的选择 ,而不是采用CDC来帮你实现 sync
Specialized databases such as Event Store [46] have been developed to support appli‐ cations using event sourcing, but in general the approach is independent of any par‐ ticular tool. A conventional database or a log-based message broker can also be used to build applications in this style.
http://web.archive.org/web/20210827180824/http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper16.pdf
If you store the changelog durably, that simply has the effect of making the state reproducible. If you consider the log of events to be your system of record, and any mutable state as being derived from it, it becomes easier to reason about the flow of data through a system. As Pat Helland puts it [52]:
- Transaction logs record all the changes made to the database. High-speed appends are the only way to change the log. From this perspective, the contents of the database hold a caching of the latest record values in the logs. The truth is the log. The database is a cache of a subset of the log. That cached subset happens to be the latest value of each record and index value from the log.
Log compaction, as discussed in “Log compaction” on page 456, is one way of bridg‐ ing the distinction between log and database state: it retains only the latest version of each record, and discards overwritten versions.
深刻!
Stream 并不是一个新领域,很多系统早就用stream 方式来monitor 系统状况了
[68] Julian Hyde: “Data in Flight: How Streaming SQL Technology Can Help Solve the Web 2.0 Data Crunch,” ACM Queue, volume 7, number 11, December 2009. doi: 10.1145/1661785.1667562
probabilistic algorithm for optimization
https://www.oreilly.com/radar/questioning-the-lambda-architecture/
bloom filter is a probabilistic algorithm?
这些开源的 stream processor 都可能会在 Ian 的课上提到?
Many open source distributed stream processing frameworks are designed with ana‐ lytics in mind: for example, Apache Storm, Spark Streaming, Flink, Concord, Samza, and Kafka Streams [74]. Hosted services include Google Cloud Dataflow and Azure Stream Analytics.
好像Flink 在很多地方都被提到了, 是否考虑 David 给的书?
Idempotence
setting key in key-value store is an idempotence operation
An idempotent operation is one that you can perform multiple times, and it has the same effect as if you performed it only once. For example, setting a key in a key-value store to some fixed value is idempotent (writing the value again simply overwrites the value with an identical value), whereas incrementing a counter is not idempotent (performing the increment again means the value is incremented twice).
有了meta data, we can make operation idempotent
Even if an operation is not naturally idempotent, it can often be made idempotent with a bit of extra metadata. For example, when consuming messages from Kafka, every message has a persistent, monotonically increasing offset. When writing a value to an external database, you can include the offset of the message that triggered the last write with the value. Thus, you can tell whether an update has already been applied, and avoid performing the same update again.
总结
这一章主要讨论了 event streams, 跟 batch processing 本质上最不同的就是 unbounded input
从 unbounded input 角度看,message brokers and event logs 跟文件系统一样
From this perspective, message brokers and event logs serve as the streaming equivalent of a filesystem.
We spent some time comparing two types of message brokers:
AMQP/JMS-style message broker
- The broker assigns individual messages to consumers, and consumers acknowl‐ edge individual messages when they have been successfully processed. Messages are deleted from the broker once they have been acknowledged. This approach is appropriate as an asynchronous form of RPC (see also “Message-Passing Data‐ flow” on page 136), for example in a task queue, where the exact order of mes‐ sage processing is not important and where there is no need to go back and read old messages again after they have been processed.
Log-based message broker
- The broker assigns all messages in a partition to the same consumer node, and always delivers messages in the same order. Parallelism is achieved through par‐ titioning, and consumers track their progress by checkpointing the offset of the last message they have processed. The broker retains messages on disk, so it is possible to jump back and reread old messages if necessary.
We distinguished three types of joins that may appear in stream processes:
Stream-stream joins
- Both input streams consist of activity events, and the join operator searches for related events that occur within some window of time. For example, it may match two actions taken by the same user within 30 minutes of each other. The two join inputs may in fact be the same stream (a self-join) if you want to find related events within that one stream.
stream stream join 主要应该是用于 stream analytics
Stream-table joins
- One input stream consists of activity events, while the other is a database change‐ log. The changelog keeps a local copy of the database up to date. For each activity event, the join operator queries the database and outputs an enriched activity event.
Stream 增强(添加更多信息在这个event 里面,比如用户数据?)
Table-table joins
- Both input streams are database changelogs. In this case, every change on one side is joined with the latest state of the other side. The result is a stream of changes to the materialized view of the join between the two tables.
materialized view (derived data?)
from [[DDIA Ch12]]
Materialized views, which are a kind of precomputed cache of query results (see
“Aggregation: Data Cubes and Materialized Views” on page 101)
[[DDIA Ch10#Materialization]] 把现有数据写入一个新的 file 就是 mterialization
网友评论