Monday, May 15, 2017

Kafka Security - Part 3

So far in the series, I have covered how to configure SASL with PlainText (Part 1)  and how to have different listeners for broker and consumers (Part 2)

In this part, I would dissect the org.apache.kafka.common.security.plain.PlainLoginModule which was configured for SASL with PlainText in Part 1 of this series. This would give insight into developing your custom LoginModule over SASL.

PlainLoginModule: 
public class PlainLoginModule implements LoginModule {
    private static final String USERNAME_CONFIG = "username";
    private static final String PASSWORD_CONFIG = "password";

    static {
        PlainSaslServerProvider.initialize();
    }

public PlainLoginModule() {
    }

    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
        String username = (String)options.get("username");
        if(username != null) {
            subject.getPublicCredentials().add(username);
        }

        String password = (String)options.get("password");
        if(password != null) {
            subject.getPrivateCredentials().add(password);
        }

    }

    public boolean login() throws LoginException {
        return true;
    }

    public boolean logout() throws LoginException {
        return true;
    }

    public boolean commit() throws LoginException {
        return true;
    }

    public boolean abort() throws LoginException {
        return false;
    }
}

The same class PlainLoginModule is used for dual purpose which makes it confusing.

  1. The Broker to pass in username and password to authenticate against other Brokers
  2. The Broker to initialize a Server Provider which supports authentication using SASL
In the initialize method of the LoginModule, the Broker adds the configured username and password in the Subject. 
This Subject is used by this broker to send in the username/password for authentication by the other user. 

 public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
        String username = (String)options.get("username");
        if(username != null) {
            subject.getPublicCredentials().add(username);
        }

        String password = (String)options.get("password");
        if(password != null) {
            subject.getPrivateCredentials().add(password);
        }

    }


In the static block of the PlainLoginModule class, we are initializing the Sasl Server Provider

    static {
        PlainSaslServerProvider.initialize();
    }

PlainLoginModule is used to configure the SASL server provider in the Kafka Broker. 
The Broker uses the SASL server Provider to authenticate the in-coming call to the broker.

In the initialize method of PlainSaslServerProvider, we are configuring PlainSaslServerProvider as a Security Provider.
The PlainSaslServerProvider configures PlainSaslServerFactory as the SaslServerFactory. 

public class PlainSaslServerProvider extends Provider {
    private static final long serialVersionUID = 1L;

    protected PlainSaslServerProvider() {
        super("Simple SASL/PLAIN Server Provider", 1.0D, "Simple SASL/PLAIN Server Provider for Kafka");
        super.put("SaslServerFactory.PLAIN", PlainSaslServerFactory.class.getName());
    }

    public static void initialize() {
        Security.addProvider(new PlainSaslServerProvider());
    }
}

The PlainSaslServerFactory provides PlainSaslServer as the SaslServer. 
PlainSaslServer evaluates the username/password in the evaluateResponse method. 

Tuesday, May 9, 2017

Kafka Security - Part 2



In the previous blog post Kafka Security - Part 1, I describe in length on how to setup SASL Plaintext as the security mechanism.

In this blog post, I would explain on how to setup security differently for Broker to Broker communication and Client (Producer/Consumer) to Broker communication.

In Kafka, you can set up multiple listeners

  1. Setup two Listeners in servers.properties:

    listeners=CLIENT_LISTENER://:9091, BROKER_LISTENER://:9092
  2. Setup security map for the custom listeners:

    listener.security.protocol.map= CLIENT_LISTENER:SASL_PLAINTEXT, BROKER_LISTENER: SASL_PLAINTEXT
  3. Setup broker to broker communication to use BROKER_LISTENER

    security.inter.broker.protocol=BROKER_LISTENER
  4. Setup broker to broker communication to use PLAIN, for this scenario

    sasl.mechanism.inter.broker.protocol=PLAIN
  5. Setup Client (Producer/Consumer) to Broker communication to use PLAIN for this case.

    sasl.enabled.mechanisms=PLAIN


For this scenario, we expect clients to communicate with broker on port 9091 and brokers to communicate with another broker on port 9092.

If you have Kafka sitting in a different VLAN than your client, then in that case, you can have port 9092 (for Broker-Broker communication) not exposed out side the VLAN

Wednesday, April 26, 2017

Kafka Security - Part 1


Custom Kafka Security - Part 1

In this series, my hope is to explain on how to configure Kafka Security and how to setup custom Kafka Security. The current documentation about configuring Kafka Security is basically limited to few pages. I hoping to fill in the gaps and provide more information about my findings. As a disclaimer, I do not pretend to be a security expert or Kafka expert.


Kafka supports various types of authentication methods as described in the Apache Kafka documentation


In this first case, we would like to secure Broker to Broker and Client to Broker using SASL/Plain and without TLS.

Authentication using SASL/PLAIN without TLS

This article provides the configuration for authentication of connections to brokers from clients (producers and consumers), using SASL without TLS. 

SASL/PLAIN is a simple username/password authentication mechanism to implement secure authentication. Kafka supports a default implementation for SASL/PLAIN. Apache Kafka documentation provides detail on how it can be extended for production here.The username is used as the authenticated Principal for configuration of ACLs etc.

The first step is to configure the broker to authenticate the in-coming call using SASL with PlainText. This call can come from other brokers or clients such as consumer or producer.
The second step is to configure the broker to call other brokers using SASL with PlainText. 
The third steps is to configure the clients (producer/consumer) to use SASL with PlainText.

"Configuring Kafka Brokers" section takes care of the first and second steps.


Configuring Kafka Brokers: 

1. Add a suitably modified JAAS file similar to the one below to each Kafka broker's config directory, let's call it kafka_server_jaas.conf for this example:
KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin-secret"
   user_admin="admin-secret"
   user_alice="alice-secret";
};
Let's try to understand this: 

The properties username and password in the KafkaServer section are used by the broker to initiate connections to other brokers. In this example, admin is the user for inter-broker communication. 

The set of properties user_userName defines the passwords for all users that connect to the broker. The broker validates all client connections including those from other brokers using these properties. This configuration defines two users (admin and alice). 

2. Pass the JAAS config file location as JVM parameter to each Kafka broker:
-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf
3. Configure SASL port and SASL mechanisms in server.properties. These are properties that would be configured:
listeners=OUR_LISTENER://:9092
advertised.listeners= OUR_LISTENER://:9092
security.inter.broker.protocol= OUR_LISTENER
listener.security.protocol.map= OUR_LISTENER:SASL_PLAINTEXT

sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

Let's try to understand this:

listeners=OUR_LISTENER://:9092
advertised.listeners= OUR_LISTENER://:9092
You are defining a listener with the name OUR_LISTENER.  You can give any name to the listener. 

security.inter.broker.protocol= OUR_LISTENER
#The listener to communicate with for Broker to Broker communication

listener.security.protocol.map= OUR_LISTENER:SASL_PLAINTEXT
#The actual mapping of security protocol to listener name.
#Here we are specifying that we are using SASL with PLAINTEXT (not SSL)

sasl.mechanism.inter.broker.protocol=PLAIN
#The inter broker SASL mechanism uses PLAIN text (not SSL)

sasl.enabled.mechanisms=PLAIN
#The client broker SASL mechanism uses PLAIN text (not SSL)

SASL with PlainTexy Configuration for Kafka clients

1. Add a suitably modified JAAS config file to one below to the Client directory
KafkaClient { 
     org.apache.kafka.common.security.plain.PlainLoginModule required
     username="alice"
     password="alice-secret";
};

Let's try to understand this :
The username used to call the broker is specified as "alice" with password "alice-secret". 

2. Pass the JAAS config file location as JVM parameter to each client JVM. For example:
-Djava.security.auth.login.config=/etc/kafka/kafka_client_jaas.conf
3. Instead of 1 and 2, you can provided the JAAS configuration in the Client properties

props.put("sasl.jaas.config", 
        "org.apache.kafka.common.security.plain.PlainLoginModule required\n" +
        "username=\"alice\"\n" +
        "password=\"alice-secret\";");
Here, we have configured to secure the Kafka Broker to use SASL with PlainText.
In the next blog kafka-security-part-2, I would explain on how to setup Kafka configuration for Broker to Broker communication and Client to Broker communication



Wednesday, September 30, 2015

Adding Spring Boot Actuator to a non Spring Boot Application

Spring Boot is excellent for setting up application quickly without re-developing any of the boiler plate (infrastructure) code.

The instructions to create Spring Boot application are provided in the spring.io link. The Spring Intializr provides a quick way to create an application. Spring Boot provides ways to integrate with the different frameworks. 

One of the key features of the Spring Boot is the spring-actuator which provides production-ready features out of the box. Spring Boot actuator exposes various endpoints which provides information about the running application. The details of the various endpoints are provided in the Spring Boot documentation

There were several questions that came to my mind which I read more about Spring Boot Actuator. 
  • What if you have a web application which uses Spring but it is not a Spring Boot application?
  • Would you have to implement all the production-ready features such as Health endpoint, Info endpoint, etc. again in your application?
  • Do you have to re-invent the wheel which the Spring team has done in the Spring Boot Actuator project?
  • Would it not be great that you could create the dependency on the Spring Boot Actuator jar and get all the functionalities from it? 

There are several blog posts which seem to suggest that it can be done. I tried following the instructions, but it seems to not work with the version of Spring Boot that I was trying to use ( version 1.2.5 ). 
So, I spend sometime debugging and looking at the Spring Boot Actuator code and figure out how I can set up Spring Boot actuator in a non-boot project. 

This is what worked for me. 

In the application's build.gradle, I added the following dependency
compile('org.springframework.boot:spring-boot-actuator:1.2.5.RELEASE'){
    exclude group: 'org.springframework.boot', module:'spring-boot-starter-logging'}

In the application's Spring Config class, I added the following things:

 import org.springframework.beans.factory.annotation.Autowired;  
 import org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration;  
 import org.springframework.boot.actuate.endpoint.BeansEndpoint;  
 import org.springframework.boot.actuate.endpoint.HealthEndpoint;  
 import org.springframework.boot.actuate.endpoint.InfoEndpoint;  
 import org.springframework.boot.actuate.endpoint.RequestMappingEndpoint;  
 import org.springframework.boot.actuate.endpoint.mvc.EndpointHandlerMapping;  
 import org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter;  
 import org.springframework.boot.actuate.endpoint.mvc.HealthMvcEndpoint;  
 import org.springframework.boot.actuate.endpoint.mvc.MvcEndpoint;  
 @Configuration  
 @Import(EndpointAutoConfiguration.class)  
 public class MyAppSpringConfig {  
   @Bean  
   @Autowired  
   //Define the HandlerMapping similar to RequestHandlerMapping to expose the endpoint  
   public EndpointHandlerMapping endpointHandlerMapping(  
     Collection<? extends MvcEndpoint> endpoints  
   ){  
     return new EndpointHandlerMapping(endpoints);  
   }  
   @Bean  
   @Autowired  
   //define the HealthPoint endpoint  
   public HealthMvcEndpoint healthMvcEndpoint(HealthEndpoint delegate){  
     return new HealthMvcEndpoint(delegate, false);  
   }  
   @Bean  
   @Autowired  
   //define the Info endpoint  
   public EndpointMvcAdapter infoMvcEndPoint(InfoEndpoint delegate){  
     return new EndpointMvcAdapter(delegate);  
   }  
   @Bean  
   @Autowired  
   //define the beans endpoint  
   public EndpointMvcAdapter beansEndPoint(BeansEndpoint delegate){  
     return new EndpointMvcAdapter(delegate);  
   }  
   @Bean  
   @Autowired  
   //define the mappings endpoint  
   public EndpointMvcAdapter requestMappingEndPoint(  
     RequestMappingEndpoint delegate  
   ){  
     return new EndpointMvcAdapter(delegate);  
   }  
 }  

Voila, you have added the spring actuator to your application.

In case, you want to get rid of additional dependency of spring-boot-autconfigure from your application, you would have to do the following to integrate it.

In the gradle, you would add: 
compile('org.springframework.boot:spring-boot-actuator:1.2.5.RELEASE'){
  exclude group: 'org.springframework.boot', module:'spring-boot-starter-logging'  exclude group: 'org.springframework.boot', module:'spring-boot-autoconfigure'}

Create a Spring Config and import it to your application's Spring Config. In the example below, I have added only health endpoint and beans endpoint for your reference
 import org.springframework.beans.factory.annotation.Autowired;  
 import org.springframework.boot.actuate.endpoint.BeansEndpoint;  
 import org.springframework.boot.actuate.endpoint.HealthEndpoint;  
 import org.springframework.boot.actuate.endpoint.RequestMappingEndpoint;  
 import org.springframework.boot.actuate.endpoint.mvc.EndpointHandlerMapping;  
 import org.springframework.boot.actuate.endpoint.mvc.EndpointMvcAdapter;  
 import org.springframework.boot.actuate.endpoint.mvc.HealthMvcEndpoint;  
 import org.springframework.boot.actuate.endpoint.mvc.MvcEndpoint;  
 import org.springframework.boot.actuate.health.HealthAggregator;  
 import org.springframework.boot.actuate.health.HealthIndicator;  
 import org.springframework.boot.actuate.health.OrderedHealthAggregator;  
 import org.springframework.context.annotation.Bean;  
 import org.springframework.context.annotation.Configuration;  
 import java.util.Collection;  
 import java.util.HashMap;  
 import java.util.Map;  
 @Configuration  
 public class BootEndpointAutoConfiguration {  
 
   private HealthAggregator healthAggregator = new OrderedHealthAggregator();  
 
   private Map<String, HealthIndicator> healthIndicators = new HashMap();

   @Bean  
   @Autowired  
   //Define handler mapping  
   public EndpointHandlerMapping endpointHandlerMapping(  
     Collection<? extends MvcEndpoint> endpoints  
   ){  
     return new EndpointHandlerMapping(endpoints);  
   }  

   @Bean  
   @Autowired  
   //Define the beans for exposing the mapping  
   public EndpointMvcAdapter requestMappingEndPoint(RequestMappingEndpoint delegate){  
     return new EndpointMvcAdapter(delegate);  
   }  

   @Bean  
   @Autowired  
   //Define the bean to expose the health  
   public HealthMvcEndpoint healthMvcEndpoint(HealthEndpoint delegate){  
     return new HealthMvcEndpoint(delegate, false);  
   }  

   @Bean  
   //Define the bean to expose an health point  
   public HealthEndpoint healthEndpoint() {  
     return new HealthEndpoint(this.healthAggregator, this.healthIndicators);  
   }  

   @Bean  
   public BeansEndpoint beansEndpoint() {  
     return new BeansEndpoint();  
   }  

   @Configuration  
   protected static class RequestMappingEndpointConfiguration {  
     protected RequestMappingEndpointConfiguration() {  
     }  
     @Bean  
     public RequestMappingEndpoint requestMappingEndpoint() {  
       RequestMappingEndpoint endpoint = new RequestMappingEndpoint();  
       return endpoint;  
     }  
   }  
 }