Kafka原理分析

Kafka原理分析

介绍

Apache Kafka是一个分布式发布-订阅消息传递系统,最初由LinkedIn公司开发,后于2010年贡献给Apache 基金会并成为顶级开源项目。它是一个分布式的、可划分的、冗余备份的持久性的日志服务,主要用于处理活跃的流式数据。其架构如下图所示:

mark

Kafka的工作流程大致如下:

生产者会根据业务逻辑产生消息,之后根据路由规则将消息发送到指定分区的Leader副本所在的Broker上。在Kafka服务端接收到消息后,会将消息追加到Log中保存,之后Follower副本会与Leader副本进行同步,当ISR集合中所有副本都完成了此消息的同步后,则Leader副本的HW会增加,并向生产者返回响应。

当消费者加入到Consumer Group时,会触发Rebalance操作将分区分配给不同的消费者消费。随后,消
费者会恢复其消费位置,并向Kafka服务端发送拉取消息的请求,Leader副本会验证请求的offset以及其他相
关信息,最后返回消息。

Kafka的一些概念

Message

消息是Kafka中最基本的数据单元。消息由一串字节构成,其中主要由key和value构成,key和value也都是byte数组;

Broker

Kafka 集群包含一个或多个服务器,这种服务器被称为 Broker;

Topic

主题,Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic;

Partition

物理上的概念,每个 Topic 包含一个或多个 Partition,每个Partition内部是有序的;

每个消息在被添加到分区时,都会被分配一个offset,它是消息在此分区中的唯一编号,Kafka通过offset保证消息在分区内的顺序,offset的顺序性不跨分区,即Kafka只保证在同一个分区内的消息是有序的;同一Topic的多个分区内的消息,Kafka并不保证其顺序性。

副本

Kafka对消息进行了冗余备份,每个Partition可以有多个副本,每个副本中包含的消息是一样的。每个分区的副本集合中,都会选举出一个副本作为Leader副本,Kafka在不同的场景下会采用不同的选举策略。所有的读写请求都由选举出的Leader副本处理,其他都作为Follower副本,Follower副本仅仅是从Leader 副本处把数据拉取(pull)到本地之后,同步更新到自己的Log中。一般情况下,同一分区的多个分区会被分配到不同的Broker上,这样,当Leader所在的Broker宕机之后,可以重新选举新的Leader,继续对外提供服务。

如下图所示:



ISR集合

ISR(In-Sync Replica)集合表示的是目前“可用”(alive)且消息量与Leader相差不多的副本集合,这是整个副本集合的一个子集。“可用”和“相差不多”都是很模糊的描述,其实际含义是ISR集合中的副本必须满足下面两个条件:

  1. 副本所在节点必须维持着与ZooKeeper的连接;
  2. 副本最后一条消息的offset与Leader副本的最后一条消息的offset之间的差值不能超出指定的阈值。

每个分区中的Leader副本都会维护此分区的ISR集合。写请求首先由Leader副本处理,之后Follower副本会从Leader上拉取写入的消息,这个过程会有一定的延迟,导致Follower副本中保存的消息略少于Leader副本,只要未超出阈值都是可以容忍的。如果一个Follower副本出现异常,比如:宕机,发生长时间GC而导致Kafka僵死或是网络断开连接导致长时间没有拉取消息进行同步,就会违反上面的两个条件,从而被Leader副本踢出ISR集合。当Follower副本从异常中恢复之后,会继续与Leader副本进行同步,当Follower副本“追上”(即最后一条消息的offset的差值小于指定阈值)Leader副本的时候,此Follower副本会被Leader副本重新加入到ISR中。

HW

HW(HighWatermark)标记了一个特殊的offset,当消费者处理消息的时候,只能拉取到HW之前的消息,HW之后的消息对消费者来说是不可见的。与ISR集合类似,HW也是由Leader副本管理的。当ISR集合中全部的Follower副本都拉取HW指定消息进行同步后,Leader副本会递增HW的值。Kafka官方网站将HW之前的消息的状态称为“commit”,其含义是这些消息在多个副本中同时存在,即使此时Leader副本损坏,也不会出现数据丢失。

LEO

LEO(Log End Offset)是所有的副本都会有的一个offset标记,它指向追加到当前副本的最后一个消息的offset。当生产者向Leader副本追加消息的时候,Leader副本的LEO标记会递增;当Follower副本成功从Leader副本拉取消息并更新到本地的时候,Follower副本的LEO就会增加。下图展示了针对offset为11的消息,ISR集合、HW与LEO是如何协调工作的:



①Producer向此Partition推送消息。
②Leader副本将消息追加到Log中,并递增其LEO。
③Follower副本从Leader副本拉取消息进行同步。
④Follower副本将拉取到的消息更新到本地Log中,并递增其LEO。
⑤当ISR集合中所有副本都完成了对offset=11的消息的同步,Leader副本会递增HW。

在①~⑤步完成之后,offset=11的消息就对生产者可见了。

Kafka权衡了同步复制和异步复制两种策略,通过引入了ISR集合,巧妙地解决了上面两种复制策略存在的缺陷:

  • 当Follower副本的延迟过高时,Leader副本被踢出ISR集合,消息依然可以快速提交,生产者可以快速得到响应,避免高延时的Follower副本影响整个Kafka集群的性能。

  • 当Leader副本所在的Broker突然宕机的时候,会优先将ISR集合中Follower副本选举为Leader副本,新的Leader副本中包含了HW之前的全部消息,这就避免了消息的丢失。值得注意是,Follower副本可以批量地从Leader副本复制消息,这就加快了网络I/O,Follower 副本在更新消息时是批量写磁盘,加速了磁盘的I/O,极大减少了Follower与Leader的差距。

Producer

消息生产者,向Broker发送消息的客户端;生产者(Producer)的主要工作是生产消息,并将消息按照一定的规则推送(push)到Topic的分区中。这里选择分区的“规则”可以有很多种,例如:根据消息的key的Hash值选择分区,或按序轮询(Round-robin)全部分区的方式。

Consumer

消息消费者,从Broker读取消息的客户端;消费者(Consumer)的主要工作是从Topic中拉取消息,并对消息进行消费。某个消费者消费到Partition的哪个位置(offset)的相关信息,是Consumer自己维护的。

这样设计非常巧妙,避免了Kafka Server端维护消费者消费位置的开销,尤其是在消费数量较多的情况下。另一方面,如果是由Kafka Server端管理每个Consumer消费状态,一旦Kafka Server端出现延时或是消费状态丢失,将会影响大量的Consumer。同时,这一设计也提高了Consumer的灵活性,Consumer可以按照自己需要的顺序和模式拉取消息进行消费。例如:Consumer可以通过修改其消费的位置实现针对某些特殊key的消息进行反复消费,或是跳过某些消息的需求。

Consumer Group

在Kafka中,多个Consumer可以组成一个Consumer Group,一个Consumer只能属于一个Consumer Group。Consumer Group保证其订阅的Topic的每个分区只被分配给此Consumer Group中的一个消费者处理。如果不同Consumer Group订阅了同一Topic,Consumer Group彼此之间不会干扰。这样,如果要实现一个消息可以被多个消费者同时消费(“广播”)的效果,则将每个消费者放入单独的一个Consumer Group;如果要实现一个消息只被一个消费者消费(“独占”)的效果,则将所有的Consumer放入一个Consumer Group中。

注意,Consumer Group中消费者的数量并不是越多越好,当其中消费者数量超过分区的数量时,会导
致有消费者分配不到分区,从而造成消费者的浪费。

特性

  • 高吞吐量、低延迟:Kafka每秒可以处理几十万条消息,而它的延迟最低只有几毫秒;
  • 可拓展性:Kafka集群支持热拓展;
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);
  • 高并发:支持数千个客户端同时读写

参考资料

Java中锁的总结

  • Java中锁的总结

    一、锁的类型

    1. 自旋锁

    自旋锁是计算机科学用于多线程同步的一种锁,线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,直至显式释放自旋锁。 自旋锁避免了进程上下文的调度开销,因此对于线程只会阻塞很短时间的场合是有效的。因此操作系统的实现在很多地方往往用自旋锁。

线程的阻塞和唤醒需要CPU从用户态转为核心态,频繁的阻塞和唤醒对CPU来说是一件负担很重的工作。同时我们可以发现,很多对象锁的锁定状态只会持续很短的一段时间,例如整数的自加操作,在很短的时间内阻塞并唤醒线程显然不值得,为此引入了自旋锁。自旋锁的思想是让一个线程在请求一个共享数据的锁时执行忙循环(自旋)一段时间,如果在这段时间内能获得锁,就可以避免进入阻塞状态。

自旋锁有以下特点:

  • 用于临界区互斥
  • 在任何时刻最多只能有一个执行单元获得锁
  • 要求持有锁的线程所占用的时间尽可能短
  • 等待锁的线程进入忙循环状态

    2. 偏向锁

    偏向锁是在JDK 1.6之后加入的,其目的是消除数据在无竞争情况下的同步原语,进一步提高程序的运行性能。其思想是偏向于让第一个获得锁的线程,这个线程在之后获取该锁就不再需要进行同步操作,甚至连 CAS 操作也不再需要

当锁对象第一次被线程获得的时候,进入偏向状态,标记为 01。同时使用 CAS 操作将线程 ID 记录到 Mark Word 中,如果 CAS 操作成功,这个线程以后每次进入这个锁相关的同步块就不需要再进行任何同步操作。

当有另外一个线程去尝试获取这个锁对象时,偏向模式就宣告结束,此时撤销偏向(Revoke Bias)后恢复到未锁定状态或者轻量级锁状态。偏向锁、轻量级锁的状态转化如下所示:



3. 轻量级锁

轻量级锁是JDK 1.6之后加入的新型锁机制,是相对于传统的重量级锁而言,它使用 CAS 操作来避免重量级锁使用互斥量的开销。对于绝大部分的锁,在整个同步周期内都是不存在竞争的,因此也就不需要都使用互斥量进行同步,可以先采用 CAS 操作进行同步,如果 CAS 失败了再改用互斥量进行同步。

Mark Word(HotSpot虚拟机的对象头的第一部分)是实现轻量级锁和偏向锁的关键。对象头信息是与对象自身定义的数据无关的额外存储成本,它会根据对象的状态复用自己的存储空间。32位的Mark Word结构如下图示:



当尝试获取一个锁对象时,如果锁对象标记为 0 01,说明锁对象的锁为无锁(unlocked)状态。此时虚拟机在当前线程的虚拟机栈中创建 Lock Record,然后使用 CAS 操作将对象的 Mark Word 更新为 Lock Record 指针。如果 CAS 操作成功了,那么线程就获取了该对象上的锁,并且对象的 Mark Word 的锁标记变为 00,表示该对象处于轻量级锁状态。

如果 CAS 操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的虚拟机栈,如果是的话说明当前线程已经拥有了这个锁对象,那就可以直接进入同步块继续执行,否则说明这个锁对象已经被其他线程线程抢占了。如果有两条以上的线程争用同一个锁,那轻量级锁就不再有效,要膨胀为重量级锁。

轻量级锁能够提升程序同步性能的依据是“对于绝大部分的锁,在整个同步周期内都是不存在竞争的”,如果没有竞争,使用CAS操作能够避免使用互斥操作的开销。但如果存在锁竞争,除了互斥量的开销以外,还有额外的CAS操作,因此在竞争的情况下,轻量级锁会比传统的重量级锁更慢。

二、锁的策略

1. 乐观锁

总是假设最好的情况,每次去读数据的时候都认为别人不会修改,所以不会上锁, 但是在更新的时候会判断一下在此期间有没有其他线程更新该数据, 可以使用版本号机制CAS算法实现。 乐观锁适用于多读的应用类型,这样可以提高吞吐量,像数据库提供的类似于write_condition机制,其实都是提供的乐观锁。 在Java中java.util.concurrent.atomic包下面的原子变量类就是基于CAS实现的乐观锁。

2. 悲观锁

总是假设最坏的情况,每次去读数据的时候都认为别人会修改,所以每次在读数据的时候都会上锁, 这样别人想读取数据就会阻塞直到它获取锁 (共享资源每次只给一个线程使用,其它线程阻塞,用完后再把资源转让给其它线程)。 传统的关系型数据库里边就用到了很多悲观锁机制,比如行锁,表锁等,读锁,写锁等,都是在做操作之前先上锁。Java中的悲观锁就是Synchronized,AQS框架下的锁则是先尝试CAS乐观锁去获取锁,获取不到,才会转换为悲观锁,如ReentrantLock

乐观锁好比生活中乐观的人总是想着事情往好的方向发展,悲观锁好比生活中悲观的人总是想着事情往坏的方向发展。 这两种人各有优缺点,不能不以场景而定说一种人好于另外一种人。

三、加锁过程的描述

  1. 当代码进入同步块的时候,假设此对象没有被锁定(即无锁状态,锁的标志位为01)。虚拟机首先会将在当前线程的栈帧里面新建一个“锁记录”(Lock Record)的空间,用于存储锁对象当前Mark Word的一份拷贝(Displaced Mark Word);
  2. 将对象头的Mark Word拷贝到“锁记录”上;
  3. 虚拟机将使用CAS操作尝试将锁对象的Mark Word更新指向“锁记录”(Lock Record)。
  4. 如果更新成功,此时该线程就拥有了该对象的锁,并且对象头Mark Word的锁标志位改为00(轻量级锁);
  5. 如果上一步中的CAS操作更新失败,虚拟机首先检查对象头的Mark Word是否指向当前线程的栈帧,如果是,则说明当前线程已经拥有了当前对象的锁,那就可以直接进入同步快继续执行;
  6. 否则,说明当前锁已被其他线程占用,这时出现了两个以上的线程争用同一把锁,那轻量级锁就失效,膨胀成为重量级锁(锁标志为10)。Mark Word中存储的就是指向重量级锁的指针,后面等待锁的其他线程也要进入阻塞状态,而当前线程则使用自旋来获取锁。

    四、解锁过程的描述

  7. 通过CAS尝试把线程中复制的Displaced Mark Word对象替换当前对象的Mark Word;

  8. 如果替换成功,整个同步过程就完成了;
  9. 如果替换失败,说明有其他线程尝试过获取该锁(锁已膨胀),那就要在释放锁的同时,唤醒被挂起的线程。

    五、锁的内存语义

    当线程释放锁的时候,Java内存模型会把该线程对应的本地内存中的共享变量刷新到主内存中;

    而当线程获取锁的时候,Java内存模型会把该线程对应的本地内存置为无效,从而使得被监视器保护的临界区代码必须从主内存中读取共享变量。

    线程之间相当于通过主内存进行通信,如下图所示:



    六、锁的汇总



参考资料

  • 《深入理解Java虚拟机》

HTTP访问一个网站的过程详解

访问一个网站的过程详解

例如访问:http://www.baidu.com

HTTP请求的准备

浏览器会将http://www.baidu.com 这个域名发送给DNS服务器,让它解析成IP地址。由于HTTP是基于TCP协议的,先建立TCP连接,在HTTP 1.1的协议里面,默认开启了Keep-Alive,这样建立的TCP连接,就可以在多次请求中复用。

HTTP请求的发送

建立TCP连接以后,通过stream二进制流的方式传给对方,到了TCP层,会将二进制流变成一个报文段发送给服务器。

发送每个报文段的时候,都需要对方有一个回应ACK,来保证报文可靠地到达了对方。如果没有回应,那么TCP就会重新传输,直到可以到达。有时候同一个包可能被传输好多次,但这对于HTTP是透明的。

TCP层发送每一个报文的时候,都需要加上自己的地址(源地址)和它想要去的地方(目标地址),然后将源MAC和目标MAC放入MAC头,发送出去即可;若不在同一局域网内,就需要发送给网关,这时还需要发送ARP协议,来获取网关的MAC地址,然后将源MAC和网关MAC放入MAC头,发送出去。

网关收到包以后,发现MAC符合,取出目标IP,根据路由协议找到下一跳的路由器,获取下一跳路由器的MAC地址,将包发送给下一跳路由器。

这样路由器一跳一跳终于到达目标的局域网。这时候,最后一跳路由器能够发现,目标地址就在自己的某一个出口的局域网上。于是,在这个局域网上发送ARP,获得这个目标地址的MAC地址,将包发送出去。

目标机器发现MAC地址符合就将包收起来;发现IP地址符合,根据IP头中协议项,知道自己上一层是TCP协议,于是解析TCP头,里面有序列号,看一看这个序列包是不是我需要的,如果是就放入缓存中然后返回一个ACK,如果不是则丢弃。

TCP头里面还有端口号,HTTP的服务器正在监听这个端口号。于是,目标机器自然知道是 HTTP 服务器这个进程想要这个包,于是将包发给 HTTP 服务器。HTTP 服务器的进程看到,原来这个请求是要访问一个网页,于是就把这个网页发给客户端。

HTTP返回的发送

根据HTTP 响应报文的格式构造好返回的HTTP报文,接下来就是把这个报文发送出去。还是交给Socket去发送,还是交给TCP层,让TCP层将返回的HTML,也分成一个个小的段,并且保证每一段都可靠到达。

这些段加上 TCP 头后会交给 IP 层,然后把刚才的发送过程反向走一遍。虽然两次不一定走相同的路径,但是逻辑过程是一样的,一直到达客户端。

客户端发现 MAC 地址符合、IP 地址符合,于是就会交给 TCP 层。根据序列号看是不是自己要的报文段,如果是,则会根据 TCP 头中的端口号,发给相应的进程。这个进程就是浏览器,浏览器作为客户端也在监听某个端口。

当浏览器拿到了 HTTP 的报文。发现返回“200”,一切正常,于是就从正文中将 HTML 拿出来。HTML 是一个标准的网页格式。浏览器只要根据这个格式,展示出一个绚丽多彩的网页。

这就是一个正常的 HTTP 请求和返回的完整过程。

参考资料