Monday, May 15, 2017

Kafka Security - Part 3

So far in the series, I have covered how to configure SASL with PlainText (Part 1)  and how to have different listeners for broker and consumers (Part 2)

In this part, I would dissect the org.apache.kafka.common.security.plain.PlainLoginModule which was configured for SASL with PlainText in Part 1 of this series. This would give insight into developing your custom LoginModule over SASL.

PlainLoginModule: 
public class PlainLoginModule implements LoginModule {
    private static final String USERNAME_CONFIG = "username";
    private static final String PASSWORD_CONFIG = "password";

    static {
        PlainSaslServerProvider.initialize();
    }

public PlainLoginModule() {
    }

    public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
        String username = (String)options.get("username");
        if(username != null) {
            subject.getPublicCredentials().add(username);
        }

        String password = (String)options.get("password");
        if(password != null) {
            subject.getPrivateCredentials().add(password);
        }

    }

    public boolean login() throws LoginException {
        return true;
    }

    public boolean logout() throws LoginException {
        return true;
    }

    public boolean commit() throws LoginException {
        return true;
    }

    public boolean abort() throws LoginException {
        return false;
    }
}

The same class PlainLoginModule is used for dual purpose which makes it confusing.

  1. The Broker to pass in username and password to authenticate against other Brokers
  2. The Broker to initialize a Server Provider which supports authentication using SASL
In the initialize method of the LoginModule, the Broker adds the configured username and password in the Subject. 
This Subject is used by this broker to send in the username/password for authentication by the other user. 

 public void initialize(Subject subject, CallbackHandler callbackHandler, Map<String, ?> sharedState, Map<String, ?> options) {
        String username = (String)options.get("username");
        if(username != null) {
            subject.getPublicCredentials().add(username);
        }

        String password = (String)options.get("password");
        if(password != null) {
            subject.getPrivateCredentials().add(password);
        }

    }


In the static block of the PlainLoginModule class, we are initializing the Sasl Server Provider

    static {
        PlainSaslServerProvider.initialize();
    }

PlainLoginModule is used to configure the SASL server provider in the Kafka Broker. 
The Broker uses the SASL server Provider to authenticate the in-coming call to the broker.

In the initialize method of PlainSaslServerProvider, we are configuring PlainSaslServerProvider as a Security Provider.
The PlainSaslServerProvider configures PlainSaslServerFactory as the SaslServerFactory. 

public class PlainSaslServerProvider extends Provider {
    private static final long serialVersionUID = 1L;

    protected PlainSaslServerProvider() {
        super("Simple SASL/PLAIN Server Provider", 1.0D, "Simple SASL/PLAIN Server Provider for Kafka");
        super.put("SaslServerFactory.PLAIN", PlainSaslServerFactory.class.getName());
    }

    public static void initialize() {
        Security.addProvider(new PlainSaslServerProvider());
    }
}

The PlainSaslServerFactory provides PlainSaslServer as the SaslServer. 
PlainSaslServer evaluates the username/password in the evaluateResponse method. 

Tuesday, May 9, 2017

Kafka Security - Part 2



In the previous blog post Kafka Security - Part 1, I describe in length on how to setup SASL Plaintext as the security mechanism.

In this blog post, I would explain on how to setup security differently for Broker to Broker communication and Client (Producer/Consumer) to Broker communication.

In Kafka, you can set up multiple listeners

  1. Setup two Listeners in servers.properties:

    listeners=CLIENT_LISTENER://:9091, BROKER_LISTENER://:9092
  2. Setup security map for the custom listeners:

    listener.security.protocol.map= CLIENT_LISTENER:SASL_PLAINTEXT, BROKER_LISTENER: SASL_PLAINTEXT
  3. Setup broker to broker communication to use BROKER_LISTENER

    security.inter.broker.protocol=BROKER_LISTENER
  4. Setup broker to broker communication to use PLAIN, for this scenario

    sasl.mechanism.inter.broker.protocol=PLAIN
  5. Setup Client (Producer/Consumer) to Broker communication to use PLAIN for this case.

    sasl.enabled.mechanisms=PLAIN


For this scenario, we expect clients to communicate with broker on port 9091 and brokers to communicate with another broker on port 9092.

If you have Kafka sitting in a different VLAN than your client, then in that case, you can have port 9092 (for Broker-Broker communication) not exposed out side the VLAN