PHP rdkafka producer 使用要点及分区器的问题

技术背景:项目里需要记录各接口的运行时间并使用kafka服务,发送接口运行超时的预警邮件给相关人员。


1 PHP kafka 扩展 producer 使用步骤

1.1 获取Producer实例

$produce = new \Rdkafka\Producer();
1.2 添加kafka brokers(kafka服务器或者kafka集群)
$produce->addBrokers($brokerLists);//brokerLists:127.0.0.1:9092
其中addBrokers这个方法继承自Producer的父类Rdkafka;
RdKafka\Producer extends RdKafka {
	/* Methods */
	/* Inherited methods */
	public RdKafka::addBrokers ( string $broker_list ) : integer
	public RdKafka\Metadata RdKafka::getMetadata ( bool $all_topics , RdKafka\Topic $only_topic = NULL , int $timeout_ms )
	public RdKafka::getOutQLen ( void ) : integer
	public RdKafka\Topic RdKafka::newTopic ( string $topic_name [, RdKafka\TopicConf $topic_conf = NULL ] )
	public RdKafka::poll ( integer $timeout_ms ) : void
	public RdKafka::flush ( integer $timeout_ms ) : integer
	public RdKafka::purge ( integer $purge_flags ) : integer
	public RdKafka::setLogLevel ( integer $level ) : void
	public RdKafka::setLogger ( integer $logger_id ) : void
	public RdKafka::queryWatermarkOffsets ( string $topic , integer $partition , integer &$low , integer &$high , integer $timeout_ms ) : void
	public RdKafka::offsetsForTimes ( array $topicPartitions , integer $timeout_ms ) : array
}
1.3 获取topic实例
$topic = $produce->newTopic($topic);//topic:test
1.4 生产和发送消息
public RdKafka\ProducerTopic::produce ( integer $partition , integer $msgflags , string $payload [, string $key = NULL ] ) : void
$topic->produce(RD_KAFKA_PARTITION_UA, 0, $interfaceinformation);

生产和发送消息通过topic的produce方法实现.其中第一个参数$partition可以是明确的分区ID也可以是RD_KAFKA_PARTITION_UA其它参数都非法.


/**
 * @brief Assigns a message to a topic partition using a partitioner.
 *
 * @param do_lock if RD_DO_LOCK then acquire topic lock.
 *
 * @returns RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION or .._UNKNOWN_TOPIC if
 *          partitioning failed, or 0 on success.
 *
 * @locality any
 * @locks rd_kafka_
 */
int rd_kafka_msg_partitioner (rd_kafka_topic_t *rkt, rd_kafka_msg_t *rkm,
                              rd_dolock_t do_lock) {
	int32_t partition;
	rd_kafka_toppar_t *rktp_new;
	rd_kafka_resp_err_t err;

	if (do_lock)
		rd_kafka_topic_rdlock(rkt);

        switch (rkt->rkt_state)
        {
        case RD_KAFKA_TOPIC_S_UNKNOWN:
                /* No metadata received from cluster yet.
                 * Put message in UA partition and re-run partitioner when
                 * cluster comes up. */
		partition = RD_KAFKA_PARTITION_UA;
                break;

        case RD_KAFKA_TOPIC_S_NOTEXISTS:
                /* Topic not found in cluster.
                 * Fail message immediately. */
                err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
		if (do_lock)
			rd_kafka_topic_rdunlock(rkt);
                return err;

        case RD_KAFKA_TOPIC_S_EXISTS:
                /* Topic exists in cluster. */

                /* Topic exists but has no partitions.
                 * This is usually an transient state following the
                 * auto-creation of a topic. */
                if (unlikely(rkt->rkt_partition_cnt == 0)) {
                        partition = RD_KAFKA_PARTITION_UA;
                        break;
                }

                /* Partition not assigned, run partitioner. */
                if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA) {
                        partition = rkt->rkt_conf.
                                partitioner(rkt,
                                            rkm->rkm_key,
					    rkm->rkm_key_len,
                                            rkt->rkt_partition_cnt,
                                            rkt->rkt_conf.opaque,
                                            rkm->rkm_opaque);
                } else
                        partition = rkm->rkm_partition;

                /* Check that partition exists. */
                if (partition >= rkt->rkt_partition_cnt) {
                        err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;
                        if (do_lock)
                                rd_kafka_topic_rdunlock(rkt);
                        return err;
                }
                break;

        default:
                rd_kafka_assert(rkt->rkt_rk, !*"NOTREACHED");
                break;
        }

	/* Get new partition */
	rktp_new = rd_kafka_toppar_get(rkt, partition, 0);

	if (unlikely(!rktp_new)) {
		/* Unknown topic or partition */
		if (rkt->rkt_state == RD_KAFKA_TOPIC_S_NOTEXISTS)
			err = RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC;
		else
			err = RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION;

		if (do_lock)
			rd_kafka_topic_rdunlock(rkt);

		return  err;
	}

        rd_atomic64_add(&rktp_new->rktp_c.producer_enq_msgs, 1);

        /* Update message partition */
        if (rkm->rkm_partition == RD_KAFKA_PARTITION_UA)
                rkm->rkm_partition = partition;

	/* Partition is available: enqueue msg on partition's queue */
	rd_kafka_toppar_enq_msg(rktp_new, rkm);
	if (do_lock)
		rd_kafka_topic_rdunlock(rkt);

        if (rktp_new->rktp_partition != RD_KAFKA_PARTITION_UA &&
            rd_kafka_is_transactional(rkt->rkt_rk)) {
                /* Add partition to transaction */
                rd_kafka_txn_add_partition(rktp_new);
        }

	rd_kafka_toppar_destroy(rktp_new); /* from _get() */
	return 0;
}


上面是produce方法对应的源码,扩展项目源码链接


1.5 检查是否有待发送和需服务器确认的消息,获取这类消息的条数

$produce->getOutQLen()
1.6 为了确保队列中的消息服务回调函数能正常执行完成消息的发送和确认需要客户端定时触发事件执行相关回调函数.
$produce->poll(50);

poll方法官方说明文档。


其中,1.5和1.6为消息发送要点


2 分区数据不均衡,分区器是否有使用轮询策略


分区数据分布不均衡,造成资源浪费

如图所示:分区数据分布不均衡,造成资源浪费


2.1 rdkafka提供的分区器

rdkafka提供的分区器具体有几种请查看链接对应的源文件;


2.2 默认的分区器是

int32_t rd_kafka_msg_partitioner_random (const rd_kafka_topic_t *rkt,
					 const void *key, size_t keylen,
					 int32_t partition_cnt,
					 void *rkt_opaque,
					 void *msg_opaque) {
	int32_t p = rd_jitter(0, partition_cnt-1);
	if (unlikely(!rd_kafka_topic_partition_available(rkt, p)))
		return rd_jitter(0, partition_cnt-1);
	else
		return p;
}
取有效分区ID区间的随机整数,理论上每个分区被选择的概率相同都是1/n(n>0),扩展提供的分区器并没有分区轮询的实现,这种随机的分配策略不会造成资源浪费.


小结:程序的运行环境复杂多变,成熟的代码除了能处理各种异常情况之外,主动监测执行情况也非常必要.


相关文档

没有数据

评论0条