Building a PHP client for Faktory, Part 5: Protocol

In this part, we'll focus on supporting some more parts of the Faktory protocol.

INFO

The INFO command returns information about the Faktory instance. This is fairly straightforward:

class Client
{
  public function info(): array
  {
    $this->tcpClient->send("INFO");
    return $this->tcpClient->readLine(skipLines: 1);
  }
}

Testing:

$client = new Client(logLevel: Level::Debug);
ray($client->info());

PUSHB

The PUSHB command is a recent addition, for pushing a batch of jobs to the queue.

class Client
{
  public function pushBulk(array ...$jobs)
  {
    $this->tcpClient->sendAndRead("PUSHB", Json::stringify($jobs));
    return $this->tcpClient->readLine();
  }
}

It returns an array containing any jobs which failed to be ushed, keyed by their ID:

$job1 = [
    "jid" => "test_job_1",
    "jobtype" => "SomeJobClass",
    "args" => [1, 2, true, "hello"],
];
$job2 = [
    "jid" => "test_job_2",
];
$job3 = [
    "jid" => "test_job_3",
    "jobtype" => "SomeJobClass",
];
dump($client->pushBulk($job1, $job2, $job3));
[
  "test_job_2" => "jobs must have a jobtype parameter"
  "test_job_3" => "jobs must have an args parameter"
]

Authentication

Now, let's add support for password authentication, as described in the Faktory docs.

The protocol states that, if a password is required, thehandshake response will contain two extra parameters, s, and i:

HI {"v":2,"s":"123456789abc","i":1735}

s represents the nonce, which we'll add to the password the user has given us. We'll hash this i number of times, and send the final result back (as a hex string) as a pwdhash parameter.

In PHP, we'll use the hash function for the hashing, and then bin2hex to convert the resulting hashed bytes into a hex string. Here's what that looks like in our TcpClient class:

  protected function passwordHash(array $hiResponse): array
  {
    $requiresPassword = isset($hiResponse['s']) && isset($hiResponse['i']);
    if (!$requiresPassword) {
      return [];
    }

    $nonce = $hiResponse['s'];
    $iterations = $hiResponse['i'];
    $data = $this->password . $nonce;
    foreach (range(1, $iterations) as $ignored) {
      $data = hash("sha256", $data, binary: true);
    }
    $data = bin2hex($data);
    return ["pwdhash" => $data];
  }

And the rest of the class, with our adjusted handshake:

class TcpClient
{   
  public function __construct(
    protected LoggerInterface $logger,
    protected array $workerInfo = [],
    protected string $hostname = 'tcp://localhost',
    protected int $port = 7419,
    protected string $password = '',
  ) {
    $this->responseParser = (new ProtocolFactory())->createResponseParser();
  }

  protected function handshake()
  {
      $hiResponse = $this->readHi();
      $this->sendHello($hiResponse);
      $this->readLine(operation: "Handshake");
  }
  
  protected function readHi(): array
  {
    $hi = $this->readLine();
    if (empty($hi)) throw UnexpectedResponse::from("Handshake (HI)", $hi);
    
    $response = Json::parse(str_replace("HI ", "", $hi));
    $version = $response['v'];
    // ... Verify supported version

    return $response;
  }

  protected function sendHello(array $hiResponse)
  {
    $workerInfo = Json::stringify(array_merge(
        $this->workerInfo,
        ["v" => static::SUPPORTED_FAKTORY_PROTOCOL_VERSION],
        $this->passwordHash($hiResponse),
    ));
    $this->send("HELLO", $workerInfo);
  }
  
  // ...
}

To test this, we can start a Faktory server with a password set:

docker run --env="FAKTORY_PASSWORD=my_special_password" \
  -p 7419:7419 -p 7420:7420 -t -d contribsys/faktory:latest

If we test with debug logging enabled:

faktory-php.INFO: Connecting to Faktory server on tcp://dreamatorium.local:7419 [] []
faktory-php.DEBUG: Received: +HI {"v":2,"i":7270,"s":"67be20f130141cea"}  [] []
faktory-php.DEBUG: Sending: HELLO {"hostname":"dreamatorium","wid":"test-worker-1","pid":13596,"labels":[],"v":2,"pwdhash":"bacaaef28942d32d87348a88f2cf96d4bebd2cce71b6e52e924ca01ccece14f8"}  [] []
faktory-php.DEBUG: Received: +OK  [] []

Nice. Here's what happens with a wrong password:

faktory-php.INFO: Connecting to Faktory server on tcp://dreamatorium.local:7419 [] []
faktory-php.DEBUG: Received: +HI {"v":2,"i":6575,"s":"1c2a2ab503ae4620"}  [] []
faktory-php.DEBUG: Sending: HELLO {"hostname":"dreamatorium","wid":"test-worker-1","pid":18876,"labels":[],"v":2,"pwdhash":"5606b95f18f1fc34df10482861263a7e62f2e883d57d7896a30b50cfb2920a5e"}  [] []
faktory-php.DEBUG: Received: -ERR Invalid password  [] []

PHP Fatal error:  Uncaught Knuckles\Faktory\Problems\UnexpectedResponse: Handshake returned an unexpected response: "ERR Invalid password" 

To make debugging easier for the end user, I'll add some custom exceptions:

class MissingRequiredPassword extends \Exception
{
  public static function forServer(string $address)
  {
    return new self("Faktory server $address requires password, but none was provided.");
  }
}

class InvalidPassword extends \Exception
{
  public static function forServer(string $address)
  {
    return new self("Authentication failed: Invalid password provided for the Faktory server $address");
  }
}

A note on exception design: It's important to be as helpful as possible (without risking security!):

  • I made two separate errors for missing and incorrect password, so it's easier to track down when you forgot to set an env variable versus when you have an outdated variable or typo. The Faktory server can't distinguish between these, but we can, so we preemptively report a missing password.

    protected function passwordHash(array $hiResponse): array
    {
      $requiresPassword = isset($hiResponse['s']) && isset($hiResponse['i']);
      if (!$requiresPassword) {
        return [];
      }
    
      if ($this->password == '') {
          throw MissingRequiredPassword::forServer("$this->hostname:$this->port");
      }
    
     // ... Hash password as usual
    }
    
  • The most helpful option would be to put the password in the error message, but that would be a security risk. The next best thing is to put the server address, just in case the user has multiple servers, and is connecting to the wrong one.

Finally, tests! Here we go:

it('raises an error if password is required but empty', function () {
    $tcpClient = tcpClient(port: 7423);
    expect(fn() => $tcpClient->connect())->toThrow(MissingRequiredPassword::class);
});

it('raises an error if the wrong password is supplied', function () {
    $tcpClient = tcpClient(port: 7423, password: 'some_incorrect_password');
    expect(fn() => $tcpClient->connect())->toThrow(InvalidPassword::class);
});

it('connects if the correct password is supplied', function () {
    $tcpClient = tcpClient(port: 7423, password: 'my_special_password');
    expect($tcpClient->connect())->toBeTrue()
        ->and($tcpClient->isConnected())->toBeTrue();
});

That's it for now with the protocol. Code on GitHub.



I write about my software engineering learnings and experiments. Stay updated with Tentacle: tntcl.app/blog.shalvah.me.

Powered By Swish