Kafka第四篇——生产数据总体概括,源码解析分区策略,数据收集器,Sender发送线程,key值

目录

流程图以及总体概述

拦截器

分区器以及分区计算策略

为啥进行分区计算?

producer生产者怎么知道有哪些分区?

分区计算

如何自定义实现分区器?

想说的在图里啦!宝宝!💡 ​编辑

如果key值忘记传递了呢!?

数据校验

数据收集器

注意

Sender发送线程


流程图以及总体概述

producer进行发送record,record对象包含topic,key,value,partition,时间戳,通过拦截器,将数据信息发送给broker,但是咱们也不知道把数据信息发送给哪个broker,而我们的Metadata就可以获取出来这个,如下面代码就是获取到9092.获取到缓存,放在底层。然后经过key对象的序列化,value对象的序列化,对应在代码中就是,configMap.put()那两行,并且这个是必须写的。然后经过分区器,partition,每个数据需要发送到broker中,每个消息发送到特定的主题,主题分为多个分区。kafka在发送数据时候,可以将数据发送到指定主题的指定分区,kafka会自动决定将消息发送到那个分区。分区器有那种判断发送给那个broker。然后进行数据校验。在数据收集器当中,相当于一个缓冲池,将同一个主题的数据可以存放在一个队列中,按“批”为单位进行发送,提高效率,并且指定了每批的大小是16K,

数据已经缓存到数据收集器后,就可以进行发送数据喽!此时就不会按topic为单位进行发送了,就可以重新整合,以节点为主!(why??因为不同的topic可以发送给同一个节点呀傻瓜!也就是说,在缓冲区以topic为单位,在发送线程中以节点为单位)封装请求,然后放在缓冲区中。再由网络通信从缓冲区中取出,发送给socket。在缓冲区,需要注意概念,在途请求缓冲区为5,表示同一个节点同一时间处理的请求数量。

拦截器

数据的规范化处理。可以有多个,可以按顺序执行数据的被拦截。和框架那块的一样。

onsend方法就是主要进行执行拦截规则的,for(ProducerInterceptor<K,V> interceptor:this.intercept)就可以循环执行多个拦截器,并且,看try,catch内容,无论当前拦截器发生什么异常,都不会影响到下一个拦截器的执行,更不会影响整个数据的发送。

自定义实现拦截器,帮助自己更好地了解拦截器。

java
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;

public class ValueInterceptorTest implements ProducerInterceptor<String, String> {

    /**
     * 实现拦截器规则
     *
     */
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        
    }

    /**
     * 当记录被Broker确认接收时调用
     *
     * 
     */
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        // 这个方法在记录被Broker确认接收时被调用
        // 根据确认情况实现自定义的处理逻辑
    }

    /**
     * 关闭拦截器时调用
     */
    @Override
    public void close() {
        
    }

    /**
     * 配置拦截器时调用
     *
     *configs 配置信息
     */
    @Override
    public void configure(Map<String, ?> configs) {
        
    }
}

分区器以及分区计算策略

为啥进行分区计算?

 数据发送给某个主题,主题会有很多分区,会在不同的broker当中,所以要算分区编号,不然连数据要发送给主题哪个节点都不知道。但是分区标号也得有范围呀!

producer生产者怎么知道有哪些分区?

从元数据缓存中获取到producer需要的主题相关信息

意味着只要元数据信息缓存了,主题的相关信息我们就可以拿到。

 分区器通过Matadata获取到分区,副本id,leadid之类的,

分区计算

¹²³⁴ 如果参数中指定了分区编号就直接返回

如何自定义实现分区器?

1.实现partitioner接口, 重写相关方法。感觉主要就是实现partition方法。

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    /**
     * 配置分区器
     *
     * @param configs 配置信息
     */
    @Override
    public void configure(Map<String, ?> configs) {
       

    }

    /**
     * 计算分区
     *
     * @param topic       主题名称
     * @param key         消息键,可以为null
     * @param keyBytes    消息键的字节数组表示,可以为null
     * @param value       消息值
     * @param valueBytes  消息值的字节数组表示
     * @param cluster     Kafka集群信息
     * @return 分配的分区ID
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes,
                         Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();

        // 如果键为null,则使用轮询分区策略
        if (keyBytes == null) {
            return Utils.toPositive(Utils.murmur2(valueBytes)) % numPartitions;
        }

        // 使用键的hashCode来计算分区
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

    /**
     * 关闭分区器
     */
    @Override
    public void close() {
        // 可以在这里进行资源的清理操作,通常分区器不需要进行额外的关闭操作
    }
}

想说的在图里啦!宝宝!💡 

嘿嘿,这里解决了之前的问题,key并不像之前学到的hashmap中消费者用来消费的key,它的核心作用就是用来进行分区计算

这个点就可以从:没有指定特定的分区标号,并且分区标号没有超过范围!序列化key以及分区器不忽略key的情况下看出来。partitionForKey()方法中就用不加密的hash算法并且对分区数量进行取余处理计算。

如果key值忘记传递了呢!?

return RecordMetadata.UNKNOWN_PARTITION;(这是一个表示未知分区的常量)。表明当前生产者无法确定消息发送到哪个分区,可能需要进一步处理或记录错误信息。那感觉也不太对啊,不知道把key发送到哪一个分区!

其实他是在数据收集器那一步追加了,看这个accumulator.append方法!

点进去哦!分区标号计算:粘性分区策略

如果没有进行传递key参数,也就是当前分区是未知分区,就会根据当前主题的分区负载情况来动态获取分区标号。这就是一种优化后的粘性分区策略!如图1.1

🤔图1.1  当前分区是未知分区,就会根据当前主题的分区负载因子来动态获取分区标号。

会根据当前分区负载情况判断去那个分区!如图1.2

✅当分区负载情况为空,就动态去随机选择分区,然后就尽可能的给这个分区追加数据(粘性分区策略),并且也不能超过数值batch.size=16K。如果超过这个阈值就会切换到下一个分区。并且更新分区负载情况。

✅当前主题分区负载情况不为空,那就不用随机生成了。会根据分区负载使用频率随机生成一个随机权重,然后利用二分查找算法找与权重相近的值,根据这个值获取到相应的分区,就可以得到我们的分区标号啦!

图1.2

数据校验

当数据校验成功,数据就到达了数据收集器当中。数据收集器,生产的数据作为一个临时的存储。

数据收集器

 

如果直接生产一条数据就通过网络通信来发送,这样做效率很低哦!像javaio流读取文件一样,读一个字节写一个字节,性能很低呀!

所以就有了ProducerBatch双端队列,从很减少频繁的网络交互,提高传输效率!

在神魔时候真正进行网络交互呢??

嘿嘿,看最大范围,batch.size=16K。在前面分区计算中,有一个粘性分区策略(一旦确定了一个分区,就尽可能往这个分区中追加数据,追加数据就是往producebatch中追加数据,当到达16K,就会被sender检测到),里面就有“没有传递key,如果没有分区负载情况,就会随机生成分区,不能超过最大

注意

🤔而且这里的16k意思是超过16k就不再接收数据了,不意味着数据不能超过16k!比如数据是20k,kafka要保证数据的完整性,发现这个数据值大于16k,就立马关闭,不再接收!

Sender发送线程

 kafka底层就采用了很多生产者消费者模型,一个放一个取。数据收集器是按照主题分区来放数据,而Sender发送线程会按照broker重新整合。(主题的不同分区会放在不同的节点当中,所以有可能存在不同主题的分区在同一个节点当中)。

当整合好之后,就会封装成produceRequest,进而发送给网络客户端。

默认发送时间0,也就是消息取过来就可以直接发送了!

注意这个在途请求缓冲区数量:5

  • Broker 和 Topic:每个 broker 可以存储一个或多个 topic 的数据分片,Kafka 集群的每个 broker 都可以服务于多个 topic
  • Topic 和 分区:每个 topic 可以被分为多个分区,分区内的消息顺序是有序的,而不同分区之间的消息顺序则不保证,分区允许 Kafka 横向扩展和提高并行处理能力。
  • Broker 和 分区:每个 broker 可能会存储多个 topic 的多个分区数据,这样在整个 Kafka 集群中就形成了数据的分布式存储和处理能力。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/782510.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

前端后花园周刊vol.18-React Native 称唯一推荐的社区框架是Expo

⚡️行业动态 React Native 团队推荐使用 Expo 框架构建应用程序 React Native 发文称&#xff1a;唯一推荐的社区框架是Expo&#xff0c;Expo 的开发者从 React Native 早期就开始支持 React Native 生态系统&#xff0c;相信 Expo 提供的开发者体验是同类中最好的。 &…

vscode调试教程

VSCode调试 VSCode Debuggers VSCode使用launch.json进行细粒度的控制&#xff0c;可以启动程序或将其附加到复杂的调试场景中 打开Run and Debug视图Ctrl Shift D 点击create a launch.json file&#xff0c;选择C(GDB/LLDB) 会在工作目录自动创建.vscode/launch.json文…

简单的基追踪一维信号降噪方法(MATLAB 2018)

基追踪法是基于冗余过完备字典下的一种信号稀疏表示方法。该方法具有可提高信号的稀疏性、实现阈值降噪和提高时频分辨率等优点。基追踪法采用表示系数的范数作为信号来度量稀疏性&#xff0c;通过最小化l型范数将信号稀疏表示问题定义为一类有约束的极值问题&#xff0c;进而转…

【linux服务器】大语言模型实战教程:LLMS大模型部署到个人服务器或嵌入式开发板(保姆级教学)

&#x1f3ac; 鸽芷咕&#xff1a;个人主页 &#x1f525; 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想&#xff0c;就是为了理想的生活! 引言 说到大语言模型相信大家都不会陌生&#xff0c;大型语言模型(LLMs)是人工智能文本处理的主要类型,也现在最流行的人工智能…

julia系列17: tsp问题代码整理

1. 常用库和基础函数 这里是优化版的函数&#xff1a; using TSPLIB,LKH,Distances,PyPlot MaxNum 10000 tspreadTSPLIB(:att48) dist [round.(Int,euclidean(tsp.nodes[i,:],tsp.nodes[j,:])) for i in 1:tsp.dimension,j in 1:tsp.dimension]; pos(tsp::TSP,t::Vector{In…

Games101学习笔记 Lecture17 Materials and Appearances

Lecture17 Materials and Appearances 材质 BRDF一、Diffuse/Lambertian Material二、Glossy Material三、Ideal reflective/ refractive Material (BSDF)1.镜面反射2.镜面折射3.菲涅尔项 Fresnel 四、Microfacet BRDF 微表面五、Isotropic / Anisotropic Materials (BRDFs)An…

python - 文件 / 永久存储:pickle / 异常处理

一.文件 利用help(open)可以看到open()函数的定义&#xff1a; >>> help(open) Help on built-in function open in module _io:open(file, moder, buffering-1, encodingNone, errorsNone, newlineNone, closefdTrue, openerNone) 默认打开模式是’rt’&#xff0…

王者荣耀与和平精英的语音识别不准确怎么办?分享一次意想不到的解决经历!

文章目录 📖 介绍 📖🏡 演示环境 🏡📒 完整经历 📒🔍 问题初现 🔍🔎 排查之路:从绝望到希望的转折 🔎🎉 顿悟时刻:原来是“她”的恶作剧 🎉⚓️ 相关链接 ⚓️📖 介绍 📖 作为一位打字速度惊人的玩家,我向来自豪于能在王者荣耀和和平精英等游戏…

Three.js机器人与星系动态场景(四):封装Threejs业务组件

实际在写业务的时候不会在每个组件里都写几十行的threejs的初始化工作。我们可以 将通用的threejs的场景、相机、render、轨道控制器等进行统一初始化。同时将非主体的函数提到组件外部&#xff0c;通过import导入进组件。将业务逻辑主体更清晰一些。下面的代码是基于reactthre…

DHCP与TCP的简单解析

目录 一、DHCP 1.1 DHCP概述 1.2 DHCP的优势 1.3 DHCP的模式与分配方式***** 1.3.1 DHCP的模式&#xff1a;C/S模式&#xff08;客户机与服务器模式&#xff09; 1.3.2 DHCP的分配方式 1.4 DHCP的租约过程及原理 1.4.1 DHCP的工作原理***** 1.4.2 更新租约原理***** …

智慧校园-基础平台功能总体概述

智慧校园基础平台是现代教育信息化的核心&#xff0c;它集成了系统管理、基础数据、系统监控、系统工具、流程管理等关键功能&#xff0c;构建了一个全面、智能、安全的校园生态系统。系统管理部分&#xff0c;通过权限管理和用户管理&#xff0c;实现了对用户访问权限的精细化…

使用qt creator配置msvc环境(不需要安装shit一样的宇宙第一IDE vs的哈)

1. 背景 习惯使用Qt编程的童鞋&#xff0c;尤其是linux下开发Qt的童鞋一般都是使用qt creator作为首选IDE的&#xff0c;通常在windows上使用Qt用qt creator作为IDE的话一般编译器有mingw和msvc两种&#xff0c;使用mingw版本和在linux下的方式基本上一样十分简单&#xff0c;不…

warning: GOPATH set to GOROOT (D:\go) has no effect

warning: GOPATH set to GOROOT (D:\go) has no effect gopath 设置一下&#xff0c;并且不要和 goroot 设置成同一个目录

【carla】ubuntu安装carla环境

我们可以通过查看 CARLA 的 GitHub release 页面来找到最新版本的下载链接。 下载 CARLA 压缩包 访问 CARLA Releases 页面&#xff1a; CARLA Releases on GitHub 查找最新版本&#xff1a; 找到最新的版本&#xff0c;点击下载&#xff0c;第一个压缩包 3. 解压 CARLA 包&…

在先企业字号被申请注册成商标!

今天一网友联系普推商标知产老杨&#xff0c;说自己注册的商标被某公司无效宣告了&#xff0c;去年联系老杨时&#xff0c;当时就给说这个商标名称存在风险&#xff0c;与别人的字号权存在高度近似&#xff0c;而且是同行业同地区在后面注册的。 十几年前某公司先成功注册成字号…

AI Agent【项目实战】:MetaGPT遇上元编程,重塑复杂多智能体协作的边界

AI Agent【项目实战】&#xff1a;MetaGPT遇上元编程&#xff0c;重塑复杂多智能体协作的边界 MetaGPT 以一条需求作为输入&#xff0c;并输出用户故事/竞争分析/需求/数据结构/API/文档等。内部而言&#xff0c;MetaGPT 包含产品经理/架构师/项目经理/工程师等角色。它为软件…

树目标、抓过程、要结果

一个好的管理理念不会因为一两个成功案例而发扬&#xff0c;一定是有无数个案例验证了它的价值所在&#xff0c;既然OKR在国外已经取得成功&#xff0c;那么国内依然如此。那么OKR这么成功&#xff0c;它到底好在哪呢&#xff1f; 一、OKR是连接企业战略和落地执行的最佳方式。…

ftp服务

1.什么是FTP FTP&#xff08;文件传输协议&#xff09;是典型的C/S架构的应用层协议&#xff0c;需要由服务端软件、客户端软件两个部分共同实现文件传输功能。FTP客户端和服务器之间的连接是可靠的&#xff0c;面向连接的&#xff0c;为数据的传输提供了可靠的保证。tcp协议&a…

1.2 如何让机器说人话?万字长文回顾自然语言处理(NLP)的前世今生 —— 《带你自学大语言模型》系列

本系列目录 《带你自学大语言模型》系列部分目录及计划&#xff0c;完整版目录见&#xff1a;带你自学大语言模型系列 —— 前言 第一部分 走进大语言模型&#xff08;科普向&#xff09; 第一章 走进大语言模型 1.1 从图灵机到GPT&#xff0c;人工智能经历了什么&#xff1…

【3GPP核心网】【5G】精讲5G核心网系统架构主要特征

目录 前言 1. 5G核心网系统架构主要特征 1.1 5G核心网与4G核心网EPC区别 1.2 5G核心网系统架构主要特征 2. 5G网络逻辑架构 2.1 新型基础设施平台 2.2 逻辑架构 前言 首先需要理解核心网的角色定位&#xff0c;作为移动通信网络的核心部分&#xff0c;核心网起着承上启下的作用…