• Jump To … +
    PorthandlerConnection.coffee ProducerClass.coffee helpers.coffee
  • ProducerClass.coffee

  • ¶

    coffeelint: disable=max_line_length

  • ¶

    require dependencies

    zmq = require 'zmq'
    _ = require "lodash"
    EventEmitter = require("events").EventEmitter
    sockets = require 'omg-zmqsockets'
    helpers = require './helpers'
    configloader = require 'omg-configloader'
    PorthandlerConnection = require './PorthandlerConnection'
  • ¶

    get curveKeys for producer and server

    {producerPublicKey, serverPublicKey, producerPrivateKey} = configloader.keys()
  • ¶

    set configs of brokers

    {reqrepbroker, pushpullbroker, pubsubbroker} = configloader.zmq
    brokerStandardAddress = configloader.zmq.standardAddress
    brokerStandardTransport = configloader.zmq.standardTransport
  • ¶

    set standard config of producers

    producerconfig = configloader.producers
    producerStandardAddress = producerconfig.standardAddress
    producerStandardTransport = producerconfig.standardTransport
  • ¶

    returns address to publish broker

    getPubbrokerPort = ->
      transport = pubsubbroker.transport or brokerStandardTransport
      address = pubsubbroker.address or brokerStandardAddress
      port = pubsubbroker.sub
      fullPort = "#{transport}://#{address}:#{port}"
      return fullPort
  • ¶

    returns address to publish socket of pub/sub broker

    getPubBrokerPort = ->
      transport = pubsubbroker.transport or brokerStandardTransport
      address = pubsubbroker.address or brokerStandardAddress
      port = pubsubbroker.pub
      fullPort = "#{transport}://#{address}:#{port}"
      return fullPort
  • ¶

    returns address to reply socket of req/rep broker

    getRepbrokerPort = ->
      transport = reqrepbroker.transport or brokerStandardTransport
      address = reqrepbroker.address or brokerStandardAddress
      port = reqrepbroker.rep
      fullPort = "#{transport}://#{address}:#{port}"
      return fullPort
  • ¶

    returns address to pull socket of push/pull broker

    getPullbrokerPort = ->
      transport = pushpullbroker.transport or brokerStandardTransport
      address = pushpullbroker.address or brokerStandardAddress
      port = pushpullbroker.pull
      fullPort = "#{transport}://#{address}:#{port}"
      return fullPort
  • ¶

    One publish socket for all producers

    One publish socket is enough, because there are no replies and the connection is always to the pub/sub broker

    publishSocket = zmq.socket "pub"
    publishSocket.curve_serverkey = serverPublicKey
    publishSocket.curve_publickey = producerPublicKey
    publishSocket.curve_secretkey = producerPrivateKey
    publishSocket.connect getPubbrokerPort()
  • ¶

    One push socket for all producers

    One push socket is enough, because there are no replies and the connection is always to the push/pull broker

    pushSocket = zmq.socket "push"
    pushSocket.curve_serverkey = serverPublicKey
    pushSocket.curve_publickey = producerPublicKey
    pushSocket.curve_secretkey = producerPrivateKey
    pushSocket.connect getPullbrokerPort()
    
    class Producer extends EventEmitter
      constructor: (@name, @stopIntervalTime) ->
        debugOriginal = require("debug")("omg-#{@name}")
        @debug = ->
          args = Array.prototype.slice.call(arguments)
          args.push "(producer class)"
          debugOriginal.apply debugOriginal, args
        @debug "creating producer #{@name}"
    
        @lastPing = new Date()
        @ping = =>
          @lastPing = new Date()
    
        @cache = {}
    
        transport = producerconfig[@name] and producerconfig[@name].transport or producerStandardTransport
        address = producerconfig[@name] and producerconfig[@name].address or producerStandardAddress
  • ¶

    setup a new connection to the pushpullbroker

        sockets.connectPort @name, "push", (port) =>
          @pullSocket = zmq.socket "pull"
          @pullSocket.curve_serverkey = serverPublicKey
          @pullSocket.curve_publickey = producerPublicKey
          @pullSocket.curve_secretkey = producerPrivateKey
  • ¶

    connect to port in producerconfig file, or default producerconfig

          pullPort = producerconfig[@name] and producerconfig[@name].pull or "#{transport}://#{address}:#{port}"
          @debug "#{@name} connecting pull socket to #{pullPort}"
          @pullSocket.connect pullPort
  • ¶

    handle incoming commands

          @pullSocket.on "message", =>
            messageParts = Array.apply(null, arguments)
  • ¶

    producerName = messageParts[0] and messageParts[0].toString()

            command = messageParts[1] and messageParts[1].toString()
            args = messageParts[2] and messageParts[2].toString()
            if args
              args = JSON.parse(args)
    
            if @[command]
              @[command].apply(@[command], args)
  • ¶

    setup a new connection to reqrepbroker. Connect to the producers dealer port

        portHandlerConnection = new PorthandlerConnection()
        portHandlerConnection.getPortFor @name, "dealer", (port) =>
  • ¶

    close connection to porthandler

          portHandlerConnection.close()
          portHandlerConnection = null
  • ¶

    connect to port in producerconfig file, or default producerconfig

          addressToDealer = producerconfig[@name] and producerconfig[@name].dealer or "#{transport}://#{address}:#{port}"
          @debug "#{@name} connecting rep socket to port #{addressToDealer}"
    
          @replySocket = new sockets.ConnectedSocket @name, "rep", addressToDealer
  • ¶

    create callback function that sends reply

          replyCallback = (err, data) =>
            data = helpers.objectify data
            data = (if data then JSON.stringify(data) else "")
            err = err or ""
            status = @status or ""
            @replySocket.send [@name, err, status, data]
  • ¶

    handle incoming requests

          @replySocket.on "message", =>
            messageParts = Array.apply(null, arguments)
  • ¶

    producerName = messageParts[0] and messageParts[0].toString()

            command = messageParts[1] and messageParts[1].toString()
            args = messageParts[2] and messageParts[2].toString()
            if args
              args = JSON.parse(args)
            else
              args = []
    
            args.push replyCallback
    
            @debug "#{@name} got a request"
            if @[command]
              @[command].apply(@[command], args)
            else
              replyCallback(null, "#{@name} has no method '#{command}'")
    
        @subscribeSocket = zmq.socket "sub"
        @subscribeSocket.curve_serverkey = serverPublicKey
        @subscribeSocket.curve_publickey = producerPublicKey
        @subscribeSocket.curve_secretkey = producerPrivateKey
        @subscribeSocket.connect getPubBrokerPort()
    
    
        @stopInterval = false
        if @stopIntervalTime
          @subscriptionTime = new Date()
          @debug "will stop if there are no subscribers for #{@stopIntervalTime / 1000} seconds"
          @stopInterval = setInterval @checkSubscriptionState, 2000
    
      checkSubscriptionState: =>
        return unless @subscriptionTime
        diffTime = new Date() - @subscriptionTime
        if diffTime > @stopIntervalTime
          @debug "canceling subscription by calling 'stop'"
          unless @stop
            @debug "I don't have a stop method!"
            return
          @stop()
          @subscriptionTime = false
    
      getStatus: (callback) =>
        if callback
          if @cache
            @debug "sending from cache"
            callback null, @cache
          unless @cache
            @debug "Has nothing in cache"
            callback null, ""
        else
          @debug "no callback specified"
    
      publishDiff: (data) ->
        diff = helpers.getChanges @cache, data
        @cache = _.merge @cache, data if diff
        status = @status or ""
        if diff
          @debug "#{@name} publishing diff"
          publishSocket.send [@name, "", status, JSON.stringify(diff)]
    
      sendData: (data, reset)->
        data = helpers.objectify data
        unless reset
          @publishDiff data
        else
          @debug "Publishing full data"
          @cache = data #_.merge @cache, data
          status = @status or ""
          publishSocket.send [@name, "", status, JSON.stringify(data)]
    
      subscribeTo: (producer, callback)->
        @debug @name, "subscribing to #{producer}"
        @subscribeSocket.subscribe producer
        @subscribeSocket.on "message", ->
          msg = helpers.parseZmqMessage.apply this, arguments
          callback msg[1], msg[2], msg[3] # = err, status, data
    
      requestData: (reqObject, callback) ->
        {producer, action, args} = reqObject
        @debug "requesting data from #{producer} (#{action})"
  • ¶

    connect new request socket

        requestSocket = new sockets.ConnectedSocket @name, "req", getRepbrokerPort()
    
        successfulRequest = false
    
        requestSocket.once "message", =>
          @debug "Got reply from #{producer}"
          successfulRequest = true
          msg = helpers.parseZmqMessage.apply this, arguments
          callback msg[1], msg[2], msg[3] # = err, status, data
          @debug "closing req socket to #{producer}"
          requestSocket.close()
          requestSocket = null
        if args
          requestSocket.send [producer, action, JSON.stringify(args)]
        else
          requestSocket.send [producer, action]
    
        requestTimeout = ->
          unless successfulRequest
            requestSocket.close()
            callback "Request timed out"
    
        setTimeout requestTimeout, 10 * 10000
    
      sendCommand: (reqObject) ->
        {producer, action, args} = reqObject
        @debug "requesting data from #{producer} (#{action})"
    
        if args
          pushSocket.send [producer, action, JSON.stringify(args)]
        else
          pushSocket.send [producer, action]
    
      sendError: (e)->
        @debug "error", e
        publishSocket.send [@name, e]
    
      subscribe: =>
        @subscriptionTime = new Date()
  • ¶

    @debug “new subscriptionTime:”, @subscriptionTime

      debug: ->
  • ¶

    console.log “debug”, arguments

      sendStatus: (status) ->
        status = @status or status or ""
        publishSocket.send [@name, "", status]
    
    
      replyData: (data, callback)->
        callback null, data
    
      unsubscribeFrom: ->
    
    
    
    
    
    
    module.exports = Producer
  • ¶

    return

  • ¶

    serverPublicKeyEncoded = fs.readFileSync “/data/curvekeys/serverPublic” producerPublicKeyEncoded = fs.readFileSync “/data/curvekeys/producerPublic” producerPrivateKeyEncoded = fs.readFileSync “/data/curvekeys/producerPrivate”

  • ¶

    serverPublicKeyEncoded = serverPublicKeyEncoded.toString() producerPublicKeyEncoded = producerPublicKeyEncoded.toString() producerPrivateKeyEncoded = producerPrivateKeyEncoded.toString()

  • ¶

    serverPublicKey = z85.decode(serverPublicKeyEncoded) producerPublicKey = z85.decode(producerPublicKeyEncoded) producerPrivateKey = z85.decode(producerPrivateKeyEncoded)

  • ¶

    pullSocket = zmq.socket ‘pull’

    pullSocket.curve_publickey = producerPublicKey

    pullSocket.curve_secretkey = producerPrivateKey

    pullSocket.curve_serverkey = serverPublicKey

    pullSocket.connect “#{host}#{aggConfig.push}”

  • ¶

    class Producer extends EventEmitter constructor: (@name, @stopIntervalTime) -> amountOfProducers++

    ###
     There has to be one socket per instance to handle returned messages individually and not block the socket
    ###
    #require and connect 'req' socket producerManager
    @requestSocket = zmq.socket('req')
    @requestSocket.curve_publickey = producerPublicKey
    @requestSocket.curve_secretkey = producerPrivateKey
    @requestSocket.curve_serverkey = serverPublicKey
    @requestSocket.connect "#{host}#{aggConfig.reqRep}"
    
  • ¶
    #require and connect 'sub' socket to node.js producerManager
    @subSocket = zmq.socket('sub')
    @subSocket.curve_publickey = producerPublicKey
    @subSocket.curve_secretkey = producerPrivateKey
    @subSocket.curve_serverkey = serverPublicKey
    @subSocket.connect "#{host}#{aggConfig.pubSub}"
    
  • ¶
    @emitter = new EventEmitter()
    
  • ¶
    @debug = require("debug")("omg-#{@name}_producerClass")
    @stopInterval = false
    # if @stopIntervalTime
    #   @stopInterval = setInterval @checkSubscriptionState, 5000
    @subscriptionTime = false
    @cache = {}
    
  • ¶

    replyData: (data, callback) => replyObject = producer: @name topic: “data” data: data

  • ¶
    replyObject = objectify replyObject
    
  • ¶
    debug "no callback specified" unless callback
    callback null, replyObject
    
  • ¶

    replyError: (err, callback) => replyObject = producer: @name topic: “data” error: err

  • ¶
    replyObject = objectify replyObject
    debug "no callback specified" unless callback
    callback null, replyObject
    
  • ¶

    requestData: (requestObject, callback) => parseZmqMessageAndSend = -> msg = [] Array.prototype.slice.call(arguments).forEach (arg) -> msg.push arg.toString()

  • ¶
      data = msg[1]
      try
        data = JSON.parse(data)
      catch e
        debug e
    
  • ¶
      callback data
    
  • ¶
    @requestSocket.once "message", parseZmqMessageAndSend
    @requestSocket.send JSON.stringify(requestObject)
    
  • ¶

    sendCommand: (commandObject, callback) -> commandObject.topic = “command” pullBroker.send JSON.stringify(commandObject)

  • ¶

    subscribeTo: (producerToSubscribeTo, callback) => unless callback debug “you have to specify a callback!” return

  • ¶
    requestObject =
      producer: producerToSubscribeTo
      topic: "subscription"
      action: "subscribe"
    
  • ¶
    parseZmqMessageAndSend = ->
      msg = []
      Array.prototype.slice.call(arguments).forEach (arg) ->
        msg.push arg.toString()
    
  • ¶
      data = msg[1]
      try
        data = JSON.parse(data)
        # data = data.outputMessage
      catch e
        debug e
    
  • ¶
      callback data
    
  • ¶
    @subSocket.on "message", parseZmqMessageAndSend
    
  • ¶
    unsubscribe = (producer) =>
      if producer is requestObject.producer
        @subSocket.removeListener "message", parseZmqMessageAndSend
        @emitter.removeListener "unsubscribe", unsubscribe
    @emitter.on "unsubscribe", unsubscribe
    
  • ¶
    @subSocket.subscribe producerToSubscribeTo
    pullBroker.send JSON.stringify(requestObject)
    
  • ¶

    unsubscribeFrom: (producerToUnsubscribeFrom) => @subSocket.unsubscribe producerToUnsubscribeFrom @emitter.emit “unsubscribe”, producerToUnsubscribeFrom

  • ¶

    send: (pushObject) => pushObject.producer = @name pushObject.topic = “data”

  • ¶
    if pushObject.reset
      # @debug "resetting data of #{@name}"
      # delete @cache.data
      @cache = {}
    
  • ¶
    # @debug "1 original #{JSON.stringify(pushObject)}"
    # turn all arrays in object to objects
    pushObject = objectify pushObject
    # @debug "2 objectified #{JSON.stringify(pushObject)}"
    
  • ¶
    diff = getChanges @cache, pushObject
    # @debug "3 diff #{diff}"
    
  • ¶
    noChanges = _.isEmpty diff or not diff
    # @debug "5 noChanges: #{JSON.stringify(noChanges)}"
    
  • ¶
    # @debug "preparing data"
    # @debug "no changes" if noChanges
    
  • ¶
    if diff
      diff.producer = @name
      diff.topic = "data"
    unless noChanges
      @debug "sending data"
      pullBroker.send JSON.stringify(diff)
    
  • ¶
    @cache = _.merge(@cache, pushObject) unless pushObject.reset
    
  • ¶

    sendData: (data, reset) => pushObject = data: data status: “running”

  • ¶
    pushObject.reset = true if reset
    
  • ¶
    @send pushObject
    
  • ¶

    sendStatus: (status) => pushObject = status: status @send pushObject

  • ¶

    sendError: (e) => pushObject = error: e @send pushObject

  • ¶

    checkSubscriptionState: => return unless @subscriptionTime diffTime = new Date() - @subscriptionTime if diffTime > @stopIntervalTime @debug “canceling subscription by calling ‘stop’” @stop() @subscriptionTime = false

  • ¶

    subscribe: => @subscriptionTime = new Date() return

  • ¶

    getStatus: (callback) => if callback if @cache @cache.producer = @name @debug “sending from cache” callback null, @cache unless @cache @debug “Has nothing in cache” callback null, “” else @debug “no callback specified”

  • ¶

    module.exports = Producer