Pertanyaan Websphere MQ sebagai sumber data untuk Streaming Spark Apache


Saya sedang menggali kemungkinan untuk Websphere MQ sebagai sumber data untuk streaming-gem karena diperlukan dalam salah satu kasus penggunaan kami. Saya harus tahu itu MQTT adalah protokol yang mendukung komunikasi dari struktur data MQ, tetapi karena saya seorang pemula untuk memicu streaming, saya memerlukan beberapa contoh kerja untuk hal yang sama. Apakah ada yang mencoba menghubungkan MQ dengan aliran percikan. Silakan memikirkan cara terbaik untuk melakukannya.


13
2018-05-25 08:58


asal


Jawaban:


Jadi, saya posting di sini kode kerja untuk CustomMQReceiver yang menghubungkan Websphere MQ dan membaca data:

public class CustomMQReciever extends Receiver<String> { String host = null;
int port = -1;
String qm=null;
String qn=null;
String channel=null;
transient Gson gson=new Gson();
transient MQQueueConnection qCon= null;

Enumeration enumeration =null;

public CustomMQReciever(String host , int port, String qm, String channel, String qn) {
    super(StorageLevel.MEMORY_ONLY_2());
    this.host = host;
    this.port = port;
    this.qm=qm;
    this.qn=qn;
    this.channel=channel;

}

public void onStart() {
    // Start the thread that receives data over a connection
    new Thread()  {
        @Override public void run() {
            try {
                initConnection();
                receive();
            }
            catch (JMSException ex)
            {
                ex.printStackTrace();
            }
        }
    }.start();
}
public void onStop() {
    // There is nothing much to do as the thread calling receive()
    // is designed to stop by itself isStopped() returns false
}

 /** Create a MQ connection and receive data until receiver is stopped */
private void receive() {
  System.out.print("Started receiving messages from MQ");

    try {

    JMSMessage receivedMessage= null;

        while (!isStopped() && enumeration.hasMoreElements() )
        {

            receivedMessage= (JMSMessage) enumeration.nextElement();
            String userInput = convertStreamToString(receivedMessage);
            //System.out.println("Received data :'" + userInput + "'");
            store(userInput);
        }

        // Restart in an attempt to connect again when server is active again
        //restart("Trying to connect again");

        stop("No More Messages To read !");
        qCon.close();
        System.out.println("Queue Connection is Closed");

    }
    catch(Exception e)
    {
        e.printStackTrace();
        restart("Trying to connect again");
    }
    catch(Throwable t) {
        // restart if there is any other error
        restart("Error receiving data", t);
    }
    }

  public void initConnection() throws JMSException
{
    MQQueueConnectionFactory conFactory= new MQQueueConnectionFactory();
    conFactory.setHostName(host);
    conFactory.setPort(port);
    conFactory.setTransportType(JMSC.MQJMS_TP_CLIENT_MQ_TCPIP);
    conFactory.setQueueManager(qm);
    conFactory.setChannel(channel);


    qCon= (MQQueueConnection) conFactory.createQueueConnection();
    MQQueueSession qSession=(MQQueueSession) qCon.createQueueSession(false, 1);
    MQQueue queue=(MQQueue) qSession.createQueue(qn);
    MQQueueBrowser browser = (MQQueueBrowser) qSession.createBrowser(queue);
    qCon.start();

    enumeration= browser.getEnumeration();
   }

 @Override
public StorageLevel storageLevel() {
    return StorageLevel.MEMORY_ONLY_2();
}
}

3
2017-08-18 06:33



Saya yakin Anda dapat menggunakan JMS untuk terhubung untuk menghubungkan Websphere MQ, dan Apache Camel dapat digunakan untuk terhubung ke Websphere MQ. Anda dapat membuat Penerima kustom seperti itu (perhatikan bahwa pola ini juga dapat digunakan tanpa JMS):

class JMSReceiver(topicName: String, cf: String, jndiProviderURL: String)
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER) with Serializable  {
  //Transient as this will get passed to the Workers from the Driver
  @transient
  var camelContextOption: Option[DefaultCamelContext] = None

  def onStart() = {
    camelContextOption = Some(new DefaultCamelContext())
    val camelContext = camelContextOption.get
    val env = new Properties()
    env.setProperty("java.naming.factory.initial", "???")
    env.setProperty("java.naming.provider.url", jndiProviderURL)
    env.setProperty("com.webmethods.jms.clientIDSharing", "true")
    val namingContext = new InitialContext(env);  //using the properties file to create context

    //Lookup Connection Factory
    val connectionFactory = namingContext.lookup(cf).asInstanceOf[javax.jms.ConnectionFactory]
    camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory))

    val builder = new RouteBuilder() {
        def configure() = {
          from(s"jms://topic:$topicName?jmsMessageType=Object&clientId=$clientId&durableSubscriptionName=${topicName}_SparkDurable&maxConcurrentConsumers=10")
            .process(new Processor() {
            def process(exchange: Exchange) = {
              exchange.getIn.getBody match {
                case s: String => store(s)
              }
            }
          })
        }
      }
    }
    builders.foreach(camelContext.addRoutes)
    camelContext.start()
  }

  def onStop() = if(camelContextOption.isDefined) camelContextOption.get.stop()
}

Anda kemudian dapat membuat DStream dari acara Anda seperti ini:

val myDStream = ssc.receiverStream(new JMSReceiver("MyTopic", "MyContextFactory", "MyJNDI"))

0
2017-08-18 08:47