Demonstrates how to write kubectl plugins using ZIO libraries
The new ZIO K8s library makes extending the Kubernetes CLI toolkit quick and easy.
Andrea Peruffo recently published a blog post on the Lightbend blog about how they migrated a kubectl
plugin from Golang to Scala using the Fabric8 Kubernetes client and a few Scala libraries. This is a perfect use case for the zio-k8s library announced two weeks ago, so we decided to write this post demonstrating how to implement the same example using the ZIO ecosystem.
We are going to implement the same example, originally described in the Write a kubectl plugin in Java with JBang and fabric8 article, using the following libraries:
The source code of the example can be found here.
The linked blog post does a great job in explaining the benefits and difficulties of compiling to native image with GraalVM so we are not going to repeat it here. Instead, we will focus on how the implementation looks in the functional Scala world.
The example has to implement two kubectl commands: version
to print its own version and list
to list information about all Pods of the Kubernetes cluster in either ASCII table, JSON or YAML format.
Let's start with defining these command line options with the clipp library!
First, we define the data structures that describe our parameters:
sealed trait Format
object Format {
case object Default extends Format
case object Json extends Format
case object Yaml extends Format
}
sealed trait Command
object Command {
final case class ListPods(format: Format) extends Command
case object Version extends Command
}
final case class Parameters(verbose: Boolean, command: Command)
When parsing the arguments (passed as an array of strings), we need to either produce a Parameters
value or fail and print some usage information.
With clipp
, this is done by defining a parameter parser using its parser DSL in a for comprehension:
val spec =
for {
_ <- metadata("kubectl lp")
verbose <- flag("Verbose logging", 'v', "verbose")
commandName <- command("version", "list")
command <-
commandName match {
case "version" =>
pure(Command.Version)
case "list" =>
for {
specifiedFormat <- optional {
namedParameter[Format](
"Output format",
"default|json|yaml",
'o',
"output"
)
}
format = specifiedFormat.getOrElse(Format.Default)
} yield Command.ListPods(format)
}
} yield Parameters(verbose, command)
As we can see, it is possible to make decisions in the parser based on the previously parsed values, so each command can have a different set of arguments. In order to parse the possible output formats, we also implement the ParameterParser
type class for Format
:
implicit val parameterParser: ParameterParser[Format] = new ParameterParser[Format] {
override def parse(value: String): Either[String, Format] =
value.toLowerCase match {
case "default" => Right(Format.Default)
case "json" => Right(Format.Json)
case "yaml" => Right(Format.Yaml)
case _ => Left(s"Invalid output format '$value', use 'default', 'json' or 'yaml'")
}
override def example: Format = Format.Default
}
This is all we need to bootstrap our command line application. The following main function parses the arguments and provides the parsed Parameters
value to the ZIO
program:
def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
val clippConfig = config.fromArgsWithUsageInfo(args, Parameters.spec)
runWithParameters()
.provideCustomLayer(clippConfig)
.catchAll { _: ParserFailure => ZIO.succeed(ExitCode.failure) }
}
def runWithParameters(): ZIO[ZEnv with ClippConfig[Parameters], Nothing, ExitCode] = // ...
In runWithParameters
, we have everything needed to initialize the logging and Kubernetes modules and perform the actual command. Before talking about the initialization though, let's take a look at how we can list the pods!
We define a data type holding all the information we want to report about each pod:
case class PodInfo(name: String, namespace: String, status: String, message: String)
The task now is to fetch all pods from Kubernetes and construct PodInfo
values. In zio-k8s
getting a list of pods is defined as a ZIO Stream, which under the hood sends multiple HTTP requests to Kubernetes taking advantage of its pagination capability. In this stream each element will be a Pod
and we can start processing them one by one as soon they arrive over the wire. This way the implementation of the list
command can be something like this:
def run(format: Format) =
for {
_ <- log.debug("Executing the list command")
_ <- pods
.getAll(namespace = None)
.mapM(toModel)
.run(reports.sink(format))
.catchAll { k8sFailure =>
console.putStrLnErr(s"Failed to get the list of pods: $k8sFailure")
}
} yield ()
Let's take a look at each line!
First, log.debug
uses the ZIO logging library. We are going to initialize logging in a way that these messages only appear if the --verbose
option was enabled.
Then pods.getAll
is the ZIO Stream provided by the ZIO K8s library. Not providing a specific namespace means that we are getting pods from all namespaces.
With mapM(toModel)
we transform each Pod
in the stream to our PodInfo
data structure.
Finally we run
the stream into a sink that is responsible for displaying the PodInfo
structures with the specific output format.
The Pod
objects returned in the stream are simple case classes containing all the information available for the given resource. Most of the fields of these case classes are optional though, even though we can be sure that in our case each pod would have a name, a namespace and a status. To make working with these data structures easier within a set of expectations, they feature getter methods that are ZIO functions either returning the field's value, or failing if they are not specified. With these we can implement toModel
:
def toModel(pod: Pod): IO[K8sFailure, PodInfo] =
for {
metadata <- pod.getMetadata
name <- metadata.getName
namespace <- metadata.getNamespace
status <- pod.getStatus
phase <- status.getPhase
message = status.message.getOrElse("")
} yield PodInfo(name, namespace, phase, message)
An alternative would be to just store the optional values in PodInfo
and handle their absence in the report sink.
Let's talk about the type of the above defined run
function:
ZIO[Pods with Console with Logging, Nothing, Unit]
The ZIO environment precisely specifies the modules used by our run
function:
Module | Description |
---|---|
Pods |
for accessing K8s pods |
Console |
for printing errors on the standard error channel with putStrLnErr |
Logging |
for emitting some debug logs |
The error type is Nothing
because it can never fail - all errors are catched and displayed for the user within the run function.
Now we can see that in order to run the list
command in runWithParameters
, we must provide Pods
and Logging
modules to our implementation (Console
is part of the default environment and does not need to be provided).
These modules are described by ZIO Layers which can be composed together to provide the environment for running our ZIO program. In this case we need to define a logging layer and a kubernetes pods client layer and then compose the two for our list
implementation.
Let's start with logging:
def configuredLogging(verbose: Boolean): ZLayer[Console with Clock, Nothing, Logging] = {
val logLevel = if (verbose) LogLevel.Trace else LogLevel.Info
Logging.consoleErr(logLevel) >>> initializeSlf4jBridge
}
We create a simple ZIO console logger that will print lines to the standard error channel; the enabled log level is determined by the verbose
command line argument. As this logger writes to the console and also prints timestamps, our logging layer requires Console with Clock
to be able to build a Logging
module. Enabling the SLF4j bridge guarantees that logs coming from third party libraries will also get logged through ZIO logging. In our example this means that when we enable verbose logging, our kubectl
plugin will log the HTTP requests made by the Kubernetes library!
The second layer we must define constructs a Pods
module:
val pods = k8sDefault >>> Pods.live)
By using k8sDefault
we ask zio-k8s
to use the default configuration chain, which first tries to load the kubeconfig
and use the active context stored in it. This is exactly what kubectl
does, so it is the perfect choice when writing a kubectl
plugin. Other variants provide more flexibility such as loading custom configuration with the ZIO Config library. Once we have a k8s configuration we just feed it to the set of resource modules we need. In this example we only need to access pods. In more complex applications this would be something like k8sDefault >>> (Pods.live ++ Deployments.live ++ ...)
.
With both layers defined, we can now provide them to our command implementation:
runCommand(parameters.command)
.provideCustomLayer(logging ++ pods)
The last thing missing is the report sink that we are running the stream of pods into. We are going to define three different sinks for the three output types.
Let's start with JSON!
def sink[T: Encoder]: ZSink[Console, Nothing, T, T, Unit] =
ZSink.foreach { (item: T) =>
console.putStrLn(item.asJson.printWith(Printer.spaces2SortKeys))
}
The JSON sink requires Console
and then for each element T
it converts it to JSON and pretty prints it to console. Note that this is going to be a JSON document per each line. We could easily define a different sink that collects each element and produces a single valid JSON array of them:
def arraySink[T: Encoder]: ZSink[Console, Nothing, T, T, Unit] =
ZSink.collectAll.flatMap { (items: Chunk[T]) =>
ZSink.fromEffect {
console.putStrLn(Json.arr(items.map(_.asJson): _*).printWith(Printer.spaces2SortKeys))
}
}
The T
type paramter in our example will always be PodInfo
. By requiring it to have an implementation of circe's Encoder
type class we can call .asJson
on instances of T
, encoding it into a JSON object. We can derive these encoders automatically:
implicit val encoder: Encoder[PodInfo] = deriveEncoder
Producing YAML output is exactly the same except of first converting the JSON model to YAML with asJson.asYaml
.
The third output format option is to generate ASCII tables. We implement that with the same Java library as the original post, called asciitable
. In order to separate the specification of how to convert a PodInfo
to a table from the sink implementation, we can define our own type class similar to the JSON Encoder
:
trait Tabular[T] {
/** Initializes a table by setting properties and adding header rows
*/
def createTableRenderer(): ZManaged[Any, Nothing, AsciiTable]
/** Adds a single item of type T to the table created with [[createTableRenderer()]]
*/
def addRow(table: AsciiTable)(item: T): UIO[Unit]
/** Adds the table's footer and renders it to a string
*/
def renderTable(table: AsciiTable): UIO[String]
}
We can implement this for PodInfo
and then use a generic sink for printing the result table, similar to the previous examples:
def sink[T](implicit tabular: Tabular[T]): ZSink[Console, Nothing, T, T, Unit] =
ZSink.managed[Console, Nothing, T, AsciiTable, T, Unit](tabular.createTableRenderer()) {
table => // initialize the table
ZSink.foreach(tabular.addRow(table)) <* // add each row
printResultTable[T](table) // print the result
}
def printResultTable[T](
table: AsciiTable
)(implicit tabular: Tabular[T]): ZSink[Console, Nothing, T, T, Unit] =
ZSink.fromEffect {
tabular
.renderTable(table)
.flatMap(str => console.putStrLn(str))
}
With the report sinks implemenented we have everything ready to try out our new kubectl
plugin!
We can compile the example to native image and copy the resulting image to a location on the PATH
:
sbt nativeImage
cp target/native-image/kubectl-lp ~/bin
Then use kubectl lp
to access our custom functions: