Saturday, 28 March 2015

Abstracting JMS over lambdas: Scala vs Java 8

Follows two implementations of a JMS abstraction layer using functional interfaces in Java 8 and Scala 2.11.

Let's start with Java 8.
First the definition of two functional interfaces:
import javax.jms.Session;

@FunctionalInterface
public interface MessagingSession {
  void withSession(final Session session);
}
and import javax.jms.MessageProducer;
@FunctionalInterface
public interface MessagingProducer {
  void produce(final MessageProducer producer);
}
The main class:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

public class Messaging {
  private final boolean transacted = false;
  private final int ackMode = Session.AUTO_ACKNOWLEDGE;
 
  private final ConnectionFactory connectionFactory;
  private final Connection connection;
 
  private Session session = null;
 
  public Messaging() throws Exception {
    this.connectionFactory = new ActiveMQConnectionFactory("vm://localhost");
    this.connection = connectionFactory.createConnection();
    connection.start();
  }
 
  public Destination createQueue(final String qn) throws Exception {
    return session.createQueue(qn);
  }
 
  public TextMessage createTextMessage(final String text) throws Exception {
    return session.createTextMessage(text);
  }
 
  public void withSession(final MessagingSession messagingSession) throws Exception {
    session = connection.createSession(transacted, ackMode);
    messagingSession.withSession(session);
  }
 
  public void withProducer(final Destination destination, final MessagingProducer messagingProducer) throws Exception {
    messagingProducer.produce(session.createProducer(destination));
  }
}
And a demo class - how to use the above:
import javax.jms.Destination;
import javax.jms.TextMessage;

public class JavaMessagingDemo {
  public static void main(String[] args) throws Exception {
    final Messaging messaging = new Messaging();
  
    messaging.withSession((session) -> {
      try {
        final Destination destination = messaging.createQueue("ANOTHER.QUEUE");
        messaging.withProducer(destination, (producer) -> {
          try {
            final TextMessage message = messaging.createTextMessage("MY.MESSAGE");
            producer.send(message);
          } catch (Exception e) {
            e.printStackTrace();
          }
        });
      } catch (Exception e) {
        e.printStackTrace();
      }
    });
  }
}

In Scala 2.11:
import javax.jms.Session
import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.Destination
import javax.jms.MessageProducer

object Messaging {
  val transacted = false
  val ackMode = Session.AUTO_ACKNOWLEDGE
  val connectionFactory = new ActiveMQConnectionFactory("vm://localhost")
  val connection = connectionFactory.createConnection
  connection.start
  
  implicit def jms2richsession(session: Session) = new EnrichedJMSSession(session)
  
  def withSession(f: Session => Unit) = f(connection.createSession(transacted, ackMode))
  
  class EnrichedJMSSession(val session: Session) {
    def withProducer(dest: Destination)(f: MessageProducer => Unit) = f(session.createProducer(dest))
  }
}

object ScalaDemo {
  import Messaging._
  
  def main(args: Array[String]): Unit = {
    withSession { session =>  
      val dest = session.createQueue("MY.QUEUE")
      session.withProducer(dest) { producer =>
        val message = session.createTextMessage("Some message")
        producer.send(message)
      }
    }
  }
}

Thursday, 26 March 2015

Group By Sum Scala 2.11 vs Java 8

I had to recently implement an aggregate function in Scala 2.11 and Java 8... Which one do you prefer?

Scala 2.11
case class Datum(val id: String, val count: Int)

object GroupBySumScalaDemo {
  val list = List(Datum("a", 1), Datum("a", 2), Datum("a", 3),
                  Datum("b", 4), Datum("b", 5),
                  Datum("c", 6))
                  
  // Want to group by 'id' and sum all values
  val map = list.groupBy(_.id).mapValues(_.map(_.count).sum)   
  
  println(map)
  
  def main(args: Array[String]): Unit = {
  }
}
Java 8
class MyDatum {
  private final String id;
  private final int count;
 
  public MyDatum(final String id, final int count) {
    this.id = id;
    this.count = count;
    }

  public String getId() {
    return id;
  }

  public int getCount() {
    return count;
  }
}

public class GroupBySumJava8Demo {
  public GroupBySumJava8Demo() {
    final List list = Arrays.asList(
      new MyDatum("a", 1), new MyDatum("a", 2), new MyDatum("a", 3),
      new MyDatum("b", 4), new MyDatum("b", 5),
      new MyDatum("c", 6));
    final Map map = list.stream()
      .collect(Collectors.groupingBy(MyDatum::getId, Collectors.summingInt(MyDatum::getCount)));
    System.out.println(map);
  }

  public static void main(String[] args) {
    new GroupBySumJava8Demo();
  }
}
I know which one I like ;-)

Saturday, 7 March 2015

Monadic behaviour

Today, home-sick, read and found a bunch of nice links on Scala and Monads. For future reference, I paste those links here.
Monads in Scala I
Haskell Monads
Category Theory
Monads are elephant I, Monads are elephant II, Monads are elephant III, Monads are elephant IV
Scalaz Monads

Saturday, 21 February 2015

RPC, Protobuf, Scala

Seen too many custom frameworks for RPC, asynchronous callbacks, custom serialization, etc...... There are great frameworks out there to handle this. Let's have a look at a simple Search service, asynchronous request, using protobuf and Scala. Some *.proto def where the services are defined, and the data to marshall in and out.
syntax = "proto2";

package myprotocol;

message SearchRequest {
 required int32 requestid = 1;
}

message SearchResponse {
 required int32 requestid = 1;
 required int32 responseid = 2;
}
A simple RPC call definition:
syntax = "proto2";
option java_generic_services = true;

package myprotocol;

import "Search.proto";

service SearchService {
  rpc Search (SearchRequest) returns (SearchResponse);
}
Then you define two classes...the one that implements your RpcChannel is the one where you plugin your underlying messaging system, such as JMS, Solace, Tibco RV, or sockets...
package demo

import java.util.logging.Logger

import com.google.protobuf.Descriptors.MethodDescriptor
import com.google.protobuf.Message
import com.google.protobuf.RpcCallback
import com.google.protobuf.RpcChannel
import com.google.protobuf.RpcController

import myprotocol.Search.SearchRequest
import myprotocol.Search.SearchResponse

class SChannel extends RpcChannel {
  val logger = Logger.getLogger("SChannel")
  
  override def callMethod(methodDescriptor: MethodDescriptor , 
                          rpcController: RpcController , 
                          m1: Message, 
                          m2: Message, 
                          rpcCallback: RpcCallback[Message] ) {
    m1 match {
      case sr: SearchRequest => {
        val response = SearchResponse.newBuilder.setRequestid(sr.getRequestid).setResponseid(2).build
        rpcCallback.run(response)
      }
      case _ => logger.warning(s"Not handling message type $m1 yet")
    }
  } //  override def callMethod
}

class SController extends RpcController {
  val logger = Logger.getLogger("SController")
  
  override def reset() {
    logger.info("reset()")
  }

  override def failed(): Boolean = {
    logger.info("failed()")
    false
  }

  override def errorText(): String = {
    logger.info("errorText()")
    null
  }

  override def startCancel() {
    logger.info("startCancel()")
  }

  override def setFailed(s: String) {
    logger.info(s"setFailed($s")
  }

  override def isCanceled():Boolean = {
    logger.info("isCanceled()")
    false
  }

  override def notifyOnCancel(rpccallback: RpcCallback[Object]) {
    logger.info(s"notifyOnCancel($rpccallback)")
  }
}
Edited I Forgot the code to actually use those classes :-)
package demo

import java.util.logging.Logger

import com.google.protobuf.RpcCallback

import myprotocol.Search.SearchRequest
import myprotocol.Search.SearchResponse
import myprotocol.Services

object RPCDemo {
  val logger = Logger.getLogger("RPCDemo")
  
  def main(args: Array[String]): Unit = {
    val channel = new SChannel
    val controller = new SController
    val services = Services.SearchService.newStub(channel)
    val request = SearchRequest.newBuilder().setRequestid(1).build
    val callback = new RpcCallback[SearchResponse] {
      override def run(obj: SearchResponse) = logger.info(s"Received on RpcCallback $obj")
    }
    services.search(controller, request, callback)
    
    logger.info("Sleeping 5 seconds")
    Thread.sleep(5000)
    logger.info("Ciao")
  }
}
For example, if you are looking for a pre-built TCP/IP socket implementation on top of protobuf, check out protobuf-socket-rpc A detailed view of a custom channel can be seen there: RpcChannelImpl.java

Edited II You can obviously define an implict instead of this callback a la Java:
  import scala.language.implicitConversions
  implicit def f2cb(f: (SearchResponse) => Unit) = new RpcCallback[SearchResponse] {
    override def run(sr: SearchResponse) = f(sr)
  }

  services.search(controller, request, (resp: SearchResponse) => {
    logger.info(s"Received on RpcCallback $resp")
  })
... Voila.

Sunday, 11 January 2015

Ouray! Just completed my Machine Learning Course!

Completed my ML course from Coursera! Lots of work!

RedMart is hiring

If you are interested, please check this link. Another neat implementation in Scala is fairly easy...

Blog Archive