mongodb->getCollection(KafkaProducer::LOG); while (true) { echo 'offset:', $offset, PHP_EOL; $logs = KafkaConsumer::process(KafkaProducer::TOPIC_ERROR, $offset, 10); if (!$logs) { echo 'empty. sleep for a while...', PHP_EOL; sleep(5); continue; } foreach ($logs as $log) { $log = json_decode($log, true); if (empty($log)) continue; $ret = $colError->insert($log); echo $ret ? 'done' : 'error', PHP_EOL; } } } public function actionLog($level = KafkaProducer::LEVEL_ERROR) { if (!in_array($level, [KafkaProducer::LEVEL_ERROR, KafkaProducer::LEVEL_WARNING, KafkaProducer::LEVEL_NOTICE])) { throw new \Exception('暂无配置此错误级别:' . $level); } $limit = 10; $offsetName = 'kafka_' . $level . '_offset'; $offset = ZKafka::getOffset($offsetName); $colError = Yii::$app->mongodb->getCollection(KafkaProducer::LOG); while (true) { echo 'offset:', $offset, PHP_EOL; $logs = KafkaConsumer::process($level, $offset, $limit); if (!$logs) { echo 'empty. sleep for a while...', PHP_EOL; sleep(5); continue; } foreach ($logs as $log) { $log = json_decode($log, true); if (empty($log)) continue; $ret = $colError->insert($log); echo $ret ? 'done' : 'error', PHP_EOL; if ($ret) { ZKafka::setOffset($offsetName, ++$offset); } } } } public function actionSetoffset() { $offset = ZKafka::getOffset(ZKafka::KAFKA_ERROR_OFFSET); d($offset); $ret = ZKafka::setOffset(ZKafka::KAFKA_ERROR_OFFSET, --$offset); d($ret); $offset = ZKafka::getOffset(ZKafka::KAFKA_ERROR_OFFSET); d($offset); } public function actionCost() { $ret = Zkafka::produce('adOffline', 'planId|1|xxxdasflkd'); dd($ret); } }