adbase Kafka Sdk(使用方法见该模块的描述信息)
More...
adbase Kafka Sdk(使用方法见该模块的描述信息)
- 使用方法
- Note
- 请勿直接引用对应头文件,adbase::Kafka 模块统一引用<adbase/Kafka.hpp> 文件
- Example for producer
#include <signal.h>
class SendMessage {
public:
SendMessage() :
_index(0) {
}
~SendMessage() {}
bool send(std::string& topicName,
int* partId,
adbase::Buffer& message, uint64_t* ackCode) {
topicName = "test";
message.
append(
"TEST" + std::to_string(_index));
*partId = rand() % 5;
*ackCode = _index;
if (_index == 10) {
return false;
}
_index++;
std::this_thread::sleep_for(std::chrono::seconds(1));
return true;
}
void ackCallback(uint64_t ackCode) {
LOG_INFO <<
"Ack send ackCode: " << ackCode;
}
void errorCallback(uint64_t ackCode) {
LOG_ERROR <<
"Error send ackCode: " << ackCode;
}
private:
uint64_t _index;
};
static void killSignal(const int sig) {
(void)sig;
if (gEventLoop != nullptr) {
delete gEventLoop;
}
if (gProducer != nullptr) {
delete gProducer;
}
exit(0);
}
static void reloadConf(const int sig) {
(void)sig;
}
static void registerSignal() {
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, killSignal);
signal(SIGKILL, killSignal);
signal(SIGQUIT, killSignal);
signal(SIGTERM, killSignal);
signal(SIGHUP, killSignal);
signal(SIGSEGV, killSignal);
signal(SIGUSR1, reloadConf);
}
int main(void) {
registerSignal();
SendMessage sendMessage;
gProducer->
setSendHandler(std::bind(&SendMessage::send, &sendMessage, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
gProducer->
setAckHandler(std::bind(&SendMessage::ackCallback, &sendMessage, std::placeholders::_1));
gProducer->
setErrorHandler(std::bind(&SendMessage::errorCallback, &sendMessage, std::placeholders::_1));
return 0;
}
- Example for consumer
#include <signal.h>
class PullMessage {
public:
PullMessage() {}
~PullMessage() {}
bool pull(
const std::string& topicName,
int partId, uint64_t offset,
const adbase::Buffer& message) {
(void)topicName;
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
return true;
}
(void)consumer;
LOG_INFO <<
"Stats:" << stats.substr(0, 1024);
}
};
static void killSignal(const int sig) {
(void)sig;
if (gEventLoop != nullptr) {
delete gEventLoop;
}
if (gConsumer != nullptr) {
delete gConsumer;
}
exit(0);
}
static void reloadConf(const int sig) {
(void)sig;
}
static void registerSignal() {
signal(SIGPIPE, SIG_IGN);
signal(SIGINT, killSignal);
signal(SIGKILL, killSignal);
signal(SIGQUIT, killSignal);
signal(SIGTERM, killSignal);
signal(SIGHUP, killSignal);
signal(SIGSEGV, killSignal);
signal(SIGUSR1, reloadConf);
}
int main(void) {
registerSignal();
PullMessage pullMessage;
gConsumer->
setMessageHandler(std::bind(&PullMessage::pull, &pullMessage, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3, std::placeholders::_4));
gConsumer->
setStatCallback(std::bind(&PullMessage::stat, &pullMessage, std::placeholders::_1,
std::placeholders::_2));
return 0;
}