command = messageParts[1] and messageParts[1].toString()
args = messageParts[2] and messageParts[2].toString()
if args
args = JSON.parse(args)
else
args = []
args.push replyCallback
@debug "#{@name} got a request"
if @[command]
@[command].apply(@[command], args)
else
replyCallback(null, "#{@name} has no method '#{command}'")
@subscribeSocket = zmq.socket "sub"
@subscribeSocket.curve_serverkey = serverPublicKey
@subscribeSocket.curve_publickey = producerPublicKey
@subscribeSocket.curve_secretkey = producerPrivateKey
@subscribeSocket.connect getPubBrokerPort()
@stopInterval = false
if @stopIntervalTime
@subscriptionTime = new Date()
@debug "will stop if there are no subscribers for #{@stopIntervalTime / 1000} seconds"
@stopInterval = setInterval @checkSubscriptionState, 2000
checkSubscriptionState: =>
return unless @subscriptionTime
diffTime = new Date() - @subscriptionTime
if diffTime > @stopIntervalTime
@debug "canceling subscription by calling 'stop'"
unless @stop
@debug "I don't have a stop method!"
return
@stop()
@subscriptionTime = false
getStatus: (callback) =>
if callback
if @cache
@debug "sending from cache"
callback null, @cache
unless @cache
@debug "Has nothing in cache"
callback null, ""
else
@debug "no callback specified"
publishDiff: (data) ->
diff = helpers.getChanges @cache, data
@cache = _.merge @cache, data if diff
status = @status or ""
if diff
@debug "#{@name} publishing diff"
publishSocket.send [@name, "", status, JSON.stringify(diff)]
sendData: (data, reset)->
data = helpers.objectify data
unless reset
@publishDiff data
else
@debug "Publishing full data"
@cache = data
status = @status or ""
publishSocket.send [@name, "", status, JSON.stringify(data)]
subscribeTo: (producer, callback)->
@debug @name, "subscribing to #{producer}"
@subscribeSocket.subscribe producer
@subscribeSocket.on "message", ->
msg = helpers.parseZmqMessage.apply this, arguments
callback msg[1], msg[2], msg[3]
requestData: (reqObject, callback) ->
{producer, action, args} = reqObject
@debug "requesting data from #{producer} (#{action})"