produce($topic, $message); } public static function consume($topic, $begin, $limit) { return static::getInstance()->consume($topic, $begin, $limit); } public static function getInstance() { if (is_object(static::$_kafka)) { return static::$_kafka; } return static::$_kafka = new \Kafka("localhost:9092"); } public static function setOffset($topic, $offset) { $kafkaOffset = KafkaOffset::findOne(['name' => $topic]); if ($kafkaOffset) { $kafkaOffset->offset = $offset; return $kafkaOffset->save(); } else { $kafkaOffset = new KafkaOffset(); $kafkaOffset->name = $topic; $kafkaOffset->offset = $offset; return $kafkaOffset->save(); } } public static function getOffset($topic) { $kafkaOffset = KafkaOffset::findOne(['name' => $topic]); return $kafkaOffset ? $kafkaOffset->offset : 0; } }