diff --git a/example/consumer.js b/example/consumer.js index 0f7b2a29..05d9867c 100644 --- a/example/consumer.js +++ b/example/consumer.js @@ -8,14 +8,11 @@ var Client = kafka.Client; var client = new Client(); var topics = [ - {topic: 'topic2'}, {topic: 'topic1'}, - {topic: 't2'}, - {topic: 'topic3'} ], - options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 10000 }; + options = { autoCommit: false, fromBeginning: false, fetchMaxWaitMs: 1000 }; -function createConsumer() { +function createConsumer(topics) { var consumer = new Consumer(client, topics, options); var offset = new Offset(client); consumer.on('message', function (message) { @@ -25,7 +22,6 @@ function createConsumer() { console.log('error', err); }); consumer.on('offsetOutOfRange', function (topic) { - console.log(topic); topic.maxNum = 2; offset.fetch([topic], function (err, offsets) { var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); @@ -34,4 +30,4 @@ function createConsumer() { }) } -createConsumer(); +createConsumer(topics); diff --git a/example/producer.js b/example/producer.js index dc780624..a92e51aa 100644 --- a/example/producer.js +++ b/example/producer.js @@ -3,6 +3,9 @@ var kafka = require('../kafka'), Client = kafka.Client, client = new Client(); +var argv = require('optimist').argv; +var topic = argv.topic || 'topic1'; + var producer = new Producer(client); var letters = 'abcdefghijklmnopqrstuvwxyz', @@ -26,18 +29,17 @@ function createMsg() { var count = 1, rets = 0; producer.on('ready', function () { - setInterval(send, 1000); + //setInterval(send, 1000); + send(); }); function send() { for (var i = 0; i < count; i++) { producer.send([ - {topic: 'topic1', messages: ['777777777777777' + 1 + 'coolmessage'] }, - {topic: 'topic2', messages: ['777777777777777' + 2 + 'coolmessage'] } + {topic: topic, messages: ['777777777777777' + 2 + 'coolmessage'] } ], function (err, data) { if (err) console.log(arguments); - else console.log(data); - //if (++rets === count) process.exit(); + if (++rets === count) process.exit(); }); } } diff --git a/lib/client.js b/lib/client.js index 106fc562..f30f6d12 100644 --- a/lib/client.js +++ b/lib/client.js @@ -13,7 +13,6 @@ var Client = function (connectionString, clientId) { this.connectionString = connectionString || 'localhost:2181/kafka0.8'; this.clientId = clientId || 'kafka-node-client'; this.brokers = {} - this.longPollingBrokers = {}; this.topicMetadata = {}; this.topicPartitions = {}; this.correlationId = 0; @@ -38,9 +37,7 @@ Client.prototype.connect = function () { }); zk.on('brokersChanged', function (brokerMetadata) { self.refreshBrokers(brokerMetadata); - //setTimeout(function () { - self.emit('brokersChanged'); - //}, 5000); + self.emit('brokersChanged'); }); } @@ -72,7 +69,7 @@ Client.prototype.sendFetchRequest = function (consumer, payloads, fetchMaxWaitMs Array.prototype.unshift.call(arguments, 'error') consumer.emit.apply(consumer, arguments); } - }, consumer.longpolling); + }); } Client.prototype.sendProduceRequest = function (payloads, requireAcks, ackTimeoutMs, cb) { @@ -173,7 +170,6 @@ Client.prototype.refreshBrokers = function (brokerMetadata) { return !~_.values(brokerMetadata).map(function (b) { return b.host + ':' + b.port }).indexOf(k); }).forEach(function (deadKey) { delete this.brokers[deadKey]; - delete this.longPollingBrokers[deadKey]; }.bind(this)); } @@ -189,12 +185,12 @@ Client.prototype.refreshMetadata = function (topicNames, cb) { }); } -Client.prototype.send = function (payloads, encoder, decoder, cb, longPolling) { +Client.prototype.send = function (payloads, encoder, decoder, cb) { var self = this; // payloads: [ [metadata exists], [metadta not exists] ] payloads = this.checkMetadatas(payloads); if (payloads[0].length && !payloads[1].length) { - this.sendToBroker(_.flatten(payloads), encoder, decoder, cb, longPolling); + this.sendToBroker(_.flatten(payloads), encoder, decoder, cb); return; } if (payloads[1].length) { @@ -204,17 +200,17 @@ Client.prototype.send = function (payloads, encoder, decoder, cb, longPolling) { var error = resp[1].error; if (error) return cb(error); self.updateMetadatas(resp); - self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb, longPolling); + self.sendToBroker(payloads[1].concat(payloads[0]), encoder, decoder, cb); }); } } -Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb, longPolling) { +Client.prototype.sendToBroker = function (payloads, encoder, decoder, cb) { payloads = this.payloadsByLeader(payloads); for (var leader in payloads) { var correlationId = this.nextId(); var request = encoder.call(null, this.clientId, correlationId, payloads[leader]); - var broker = this.brokerForLeader(leader, longPolling); + var broker = this.brokerForLeader(leader); if (broker.error) return cb('Leader not available', payloads[leader]); this.cbqueue[correlationId] = [decoder, cb]; broker && broker.write(request); @@ -268,8 +264,8 @@ Client.prototype.leaderByPartition = function (topic, partition) { return this.topicMetadata[topic][partition].leader; } -Client.prototype.brokerForLeader = function (leader, longPolling) { - var brokers = longPolling ? this.longPollingBrokers : this.brokers; +Client.prototype.brokerForLeader = function (leader) { + var brokers = this.brokers; // If leader is not give, choose the first broker as leader if (typeof leader === 'undefined') { if (!_.isEmpty(brokers)) { @@ -318,7 +314,6 @@ Client.prototype.createBroker = function connect(host, port) { function retry(s) { if(s.retrying) return; - console.log('retry', s.addr) s.retrying = true; s.error = true; s.retryTimer = setTimeout(function () { diff --git a/lib/consumer.js b/lib/consumer.js index f08463e7..2f31c91d 100644 --- a/lib/consumer.js +++ b/lib/consumer.js @@ -35,7 +35,6 @@ var Consumer = function (client, topics, options) { this.options = _.defaults( (options||{}), DEFAULTS ); this.ready = false; this.id = nextId(); - this.longpolling = false; this.payloads = this.buildPayloads(topics); this.connect(); } @@ -121,16 +120,11 @@ Consumer.prototype.init = function () { * Update offset info in current payloads */ Consumer.prototype.updateOffsets = function (topics) { - var offline = !this.longpolling; this.payloads.forEach(function (p) { - if (!_.isEmpty(topics[p.topic])) { - var offset = topics[p.topic][p.partition]; - p.offset = offset + 1; - offline = (offset < p.offlineOffset); - } + if (!_.isEmpty(topics[p.topic])) + p.offset = topics[p.topic][p.partition] + 1; }); - this.longpolling = !offline; if (this.options.autoCommit) this.autoCommit(); } @@ -155,17 +149,7 @@ Consumer.prototype.commit = Consumer.prototype.autoCommit = autoCommit; Consumer.prototype.fetch = function () { if (!this.ready) return; - var maxBytes = null, - fetchMaxWaitMs = this.options.fetchMaxWaitMs, - payloads = this.payloads; - - if (!this.longpolling) { - maxBytes = 1024*1024; - fetchMaxWaitMs = 100; - payloads = this.payloads.map(function (p) { return _.defaults({ maxBytes: maxBytes }, p) }); - } - - this.client.sendFetchRequest(this, payloads, fetchMaxWaitMs, this.options.fetchMinBytes); + this.client.sendFetchRequest(this, this.payloads, this.options.fetchMaxWaitMs, this.options.fetchMinBytes); } Consumer.prototype.fetchOffset = function (payloads, cb) { diff --git a/package.json b/package.json index 75804ff1..f30524c0 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "kafka-node", "description": "node client for Apache kafka, only support kafka 0.8 and above", - "version": "0.0.4", + "version": "0.0.5", "main": "kafka.js", "dependencies": { "buffermaker": "0.0.11", @@ -13,7 +13,8 @@ "devDependencies": { "mocha": "~1.12.0", "should": "~1.2.2", - "line-by-line": "~0.1.1" + "line-by-line": "~0.1.1", + "optimist": "~0.6.0" }, "repository": { "type": "git", diff --git a/test/test.consumer.js b/test/test.consumer.js index 33c5e0d7..a1025ad5 100644 --- a/test/test.consumer.js +++ b/test/test.consumer.js @@ -10,13 +10,23 @@ var client, consumer, producer, offset; function noop() { console.log(arguments) } +function offsetOutOfRange (topic, consumer) { + topic.maxNum = 2; + offset.fetch([topic], function (err, offsets) { + var min = Math.min.apply(null, offsets[topic.topic][topic.partition]); + consumer.setOffset(topic.topic, topic.partition, min); + }); +} + before(function (done) { client = new Client(); producer = new Producer(client); offset = new Offset(client); producer.on('ready', function () { producer.createTopics(['_exist_topic_1_test', '_exist_topic_2_test'], false, function (err, created) { - producer.send([{ topic: '_exist_topic_2_test', messages: 'hello kafka' }], function (err) { + producer.send([ + { topic: '_exist_topic_2_test', messages: 'hello kafka' } + ], function (err) { done(err); }); }); @@ -24,6 +34,7 @@ before(function (done) { }); describe('Consumer', function () { + describe('events', function () { it ('should emit message when get new message', function (done) { var topics = [ { topic: '_exist_topic_2_test' } ], @@ -31,6 +42,9 @@ describe('Consumer', function () { var consumer = new Consumer(client, topics, options); var count = 0; consumer.on('error', noop); + consumer.on('offsetOutOfRange', function (topic) { + offsetOutOfRange.call(null, topic, this); + }); consumer.on('message', function (message) { message.topic.should.equal('_exist_topic_2_test'); //message.value.should.equal('hello kafka'); @@ -128,15 +142,21 @@ describe('Consumer', function () { describe('#commit', function () { it('should commit offset of current topics', function (done) { - var options = { autoCommit: true, groupId: '_groupId_commit_test' }, - topics = [{ topic: '_exist_topic_2_test' }]; + var topics = [ { topic: '_exist_topic_2_test' } ], + options = { autoCommit: false, groupId: '_groupId_commit_test' }; + var consumer = new Consumer(client, topics, options); var count = 0; + consumer.on('error', noop); + consumer.on('offsetOutOfRange', function (topic) { + offsetOutOfRange.call(null, topic, this); + }); consumer.on('message', function (message) { consumer.commit(true, function (err) { if (!err && count++ === 0) done(err); }); }); + }); });