Skip to content

Conversation

@cb-freddysart
Copy link
Contributor

This PR adds support for rd_kafka_oauthbearer_set_token and rd_kafka_oauthbearer_set_token_failure. See https://github.com/confluentinc/librdkafka/pull/2189/files#diff-eef17694b5807cd63a95c3d86c39752b62f8bdb46e46c59231bf02353a5daef5 for where this functionality was added to librdkafka.

With these changes, we are now able to successfully authenticate to AWS MSK via IAM. 🎉

Full working example:

<?php

use Aws\Credentials\CredentialProvider;
use Aws\Signature\SignatureV4;
use GuzzleHttp\Psr7\Request;
use Psr\Http\Message\RequestInterface;

require 'vendor/autoload.php';

class AuthToken {
    public function __construct(public string $tokenValue, public int $expiresAt) {}
}

class MSKAuthTokenProvider {

    private const ENDPOINT_URL_TEMPLATE = 'https://kafka.%s.amazonaws.com';
    private const DEFAULT_TOKEN_EXPIRY_SECONDS = 900;
    private const ACTION_TYPE = "Action";
    private const ACTION_NAME = "kafka-cluster:Connect";
    private const SIGNING_NAME = "kafka-cluster";
    private const USER_AGENT_KEY = "User-Agent";
    private const VERSION_KEY = 'Version';

    public function generateAuthToken(): AuthToken
    {
        $endpoint_url = sprintf(self::ENDPOINT_URL_TEMPLATE, getenv('AWS_REGION'))
            . '?' . http_build_query([self::ACTION_TYPE => self::ACTION_NAME]);

        $request = new Request('GET', $endpoint_url);
        $credentials = CredentialProvider::defaultProvider()()->wait();
        $sig_v4_signer = new SignatureV4(self::SIGNING_NAME, getenv('AWS_REGION'));
        $expiresAt = new \DateTime('+' . self::DEFAULT_TOKEN_EXPIRY_SECONDS . ' seconds');

        $presigned = $sig_v4_signer->presign($request, $credentials, $expiresAt);
        $presigned_uri = $presigned->getUri();
        // Append User-Agent and Version **after** signing
        $presigned_uri .= '&'. self::USER_AGENT_KEY . '='.urlencode('msk-iam-php').
            '&'. self::VERSION_KEY . '='. urlencode('2020_10_22'); // Version is always static

        $base64_bytes = base64_encode($presigned_uri);
        $base64_encoded_signed_url = rtrim($base64_bytes, '=');

        return new AuthToken($base64_encoded_signed_url, $this->getExpirationTimeMs($presigned));
    }

    private function getExpirationTimeMs(RequestInterface $request): int
    {
        parse_str($request->getUri()->getQuery(), $ret);
        $signing_time = DateTimeImmutable::createFromFormat('Ymd\THis\Z', $ret['X-Amz-Date'], new DateTimeZone('UTC'))
            ->getTimestamp();
        $lifetime_seconds = $ret['X-Amz-Expires'];
        $expiration_ts_seconds = $signing_time + $lifetime_seconds;

        return $expiration_ts_seconds * 1000;
    }
}

$provider = new MSKAuthTokenProvider();
$token = $provider->generateAuthToken();

$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', 'broker list');
$conf->set('security.protocol', 'SASL_SSL');
$conf->set('sasl.mechanisms', 'OAUTHBEARER');

$producer = new RdKafka\Producer($conf);

$conf->setOauthbearerTokenRefreshCb(function() use ($provider, $producer) {
    $token = $provider->generateAuthToken();
    $producer->oauthbearerSetToken($token->tokenValue, $token->expiresAt, 'kafka-cluster', ['one' => 'two']);
});

// Set initial token
$producer->oauthbearerSetToken($token->tokenValue, $token->expiresAt, 'kafka-cluster', ['one' => 'two']);

$topic = $producer->newTopic("test");
for ($i = 0; $i < 1; $i++) {
    $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Hello from PHP $i");
    $producer->poll(0);
}

for ($flushRetries = 0; $flushRetries < 10; $flushRetries++) {
    $result = $producer->flush(10000);
    if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
        break;
    }
}

echo "Done\n";

@PyaeSoneAungRgn
Copy link

yes, we need this feature 🔥

btw, i already created aws-msk-iam-sasl-signer-php package for Amazon MSK IAM Authentication

Copy link
Owner

@arnaud-lb arnaud-lb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

Do you think it would be feasible to add tests for these methods?

Currently we start a Kafka instance in CI here: https://github.com/arnaud-lb/php-rdkafka/blob/bcd5004f461d1d3a5f879bb21280bdde6f6800c2/.github/workflows/test/start-kafka.sh.

@cb-freddysart
Copy link
Contributor Author

Nice!

Do you think it would be feasible to add tests for these methods?

Currently we start a Kafka instance in CI here: https://github.com/arnaud-lb/php-rdkafka/blob/bcd5004f461d1d3a5f879bb21280bdde6f6800c2/.github/workflows/test/start-kafka.sh.

Thank you for the great review @arnaud-lb . I think I’ve got this all fixed locally but I want to take the time to get valgrind setup to be sure (I’m on a m1 which is not supported yet so I need to break out my Linux machine).

I think I should be able to write some integration tests for this. Hopefully should have something up for review by the end of the week.

@cb-freddysart cb-freddysart force-pushed the feat/oauthbearer-set-token branch from 6a40ad1 to 41c5d1f Compare January 18, 2024 13:26
@stefan-bolsinger-hald
Copy link

I'm thrilled that the PR for oauth2 has already been implemented because I urgently need this feature for an application. Is it possible to provide a new release with this feature in the short term? That would be a big help for me.

@cb-freddysart
Copy link
Contributor Author

Hey all, I hope to get this over the line but I've had a shift in priorities after coming back from some time off. Configuring the OAuth side of things on Kafka is new to me and I haven't had the time to figure out exactly how that should work (for example, do we also need to spin up a keycloak container for generating and validating the tokens?).

I'm open to any help if anyone else is more familiar in this area. Otherwise, I'll keep poking at this when I have time.

@scorgn
Copy link
Contributor

scorgn commented Oct 1, 2024

What left is needed for this MR? Is there anything other than integration tests?

I'm not too familiar with C but I would be happy to help brush up on it and work on implementing integration tests if that is all that is needed for merging.

@devenjahnke
Copy link

@arnaud-lb What is blocking this PR from being merged? I'm happy to take a stab at rounding it out.

I need this feature for AWS IAM authentication with MSK, and I will be blocked until this merges.

Cheers!

@cb-freddysart
Copy link
Contributor Author

afaik, the only thing blocking this PR is the additional tests that @arnaud-lb asked for. I haven't had the time to figure out how to get kafka set up properly.

* Fix seg fault in oauthbearerSetToken definition in rdkafka.c when not passing value for extension
* Fix issue with oauthbearer_token_refresh not persisting (was not included in kafka_conf_callbacks_copy)
* Modify start-kafka.sh to start kafka container with oauth2 configuration
* Implement oauth2 integration tests
… support it

* Add new test env var SKIP_OAUTH based on matrix.skipoauth
* Set matrix.skipoauth on all librdkafka versions below v1.1.0
* Don't set up kafka_oauth2 container if SKIP_OAUTH is 1
* Skip tests in oauthbearer_integration.phpt if RD_KAFKA_VERSION is below 0x010100
@scorgn
Copy link
Contributor

scorgn commented Oct 14, 2024

@cb-freddysart I’ve submitted a PR to your branch that starts a Kafka container with the oauthbearer security protocol alongside the regular Kafka container, and implements integration testing for the oauthbearer methods. It also includes a few small related fixes in the rdkafka extension. Would you be able to review and merge it into your branch?

cb-freddysart#1

Once that's merged, we'll need to resolve merge conflicts. I decided not to merge 6.x into my branch and resolve conflicts beforehand to make the PR into your branch easier to review with fewer changes.

After 6.x is merged into this branch, there’s one additional fix required for a test implemented in 6.x that doesn’t exist yet in this branch. The test is located at ./tests/controller_id.phpt. It expects the broker ID to be 1001, which is the generated broker ID when none is set. However, my changes manually set the broker ID to 1, so we’ll need to update the test to expect 1 instead of 1001.

I had to manually set the broker IDs in order to have two separate Kafka containers using the same Zookeeper instance, and I’m unable to set the broker ID above 1000. Therefore, we can’t retain the broker ID that the future test expects.

@arnaud-lb
Copy link
Owner

Thanks a lot for the help @scorgn. The PR will be mergeable with these changes.

@scorgn
Copy link
Contributor

scorgn commented Oct 15, 2024

Happy to help! Thanks for confirming.

@cb-freddysart I also created another PR that does have 6.x merged in, conflicts resolved, and that test fixed. It will have more changes since it also includes commits from 6.x, but I created it just in case it is easier for you to merge in that PR and not have to fix conflicts/test.

cb-freddysart#2

Will close whichever one is not merged.

@cb-freddysart
Copy link
Contributor Author

Great work @scorgn! I've approved both PRs. Let's merge the second PR since you have already gone to the trouble of pulling in 6.x?

@scorgn
Copy link
Contributor

scorgn commented Oct 15, 2024

@cb-freddysart Yup that makes sense! I am not able to merge but you should be able to.

cb-freddysart and others added 3 commits October 15, 2024 15:19
…nflicts

Implement integration tests for oauth methods (also merge 6.x, solve conflicts, fix test)
…/php-rdkafka into feat/oauthbearer-set-token

- name: 'Archive package'
uses: 'actions/upload-artifact@v2'
uses: 'actions/upload-artifact@v4'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@scorgn
Copy link
Contributor

scorgn commented Oct 15, 2024

Do we need to add the new oauth test file to package.xml?

@cb-freddysart
Copy link
Contributor Author

@scorgn, it's not totally clear to me as I've only be debugging this through action runs, but it looks like we might have a little flakiness with the kafka setup. I've increased the metadata timeout and enabled debug logs for the broker for the time being. This got the tests to more reliably get past kafka setup, but now we are seeing intermittent failures around topics not existing.

@scorgn
Copy link
Contributor

scorgn commented Oct 16, 2024

@cb-freddysart Is the flakiness with those tests with v0.11.6 that say unknown broker? If so those are also flaky in 6.x and have been for a little bit. If we want to try to fix those I think a separate PR would be better, but I don't think it needs to be addressed for this PR.

@arnaud-lb Do checks look ok as far as this being mergeable?

@eborden
Copy link

eborden commented Oct 23, 2024

@arnaud-lb any idea when this will merge and be released? I'm blocked until this gets in.

@arnaud-lb arnaud-lb merged commit 743f1d3 into arnaud-lb:6.x Oct 24, 2024
48 of 53 checks passed
@arnaud-lb
Copy link
Owner

Thank you @cb-freddysart @scorgn !

@arnaud-lb
Copy link
Owner

@oaattia
Copy link

oaattia commented Mar 10, 2025

Hi, thank you for the PR, @arnaud-lb.

I have a question about this: Does the OAUTHBEARER only support low-level APIs? I’ve noticed that it hasn't been implemented for high-level APIs as well

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants