> ## Documentation Index
> Fetch the complete documentation index at: https://docs.streamnative.io/llms.txt
> Use this file to discover all available pages before exploring further.

# Configure state storage (Private Preview)

<Note title="Note">
  This feature is currently in private preview. If you want to try it out or have any questions, [submit a ticket](https://support.streamnative.io/hc/en-us/requests/new) to the support team.
</Note>

# Configure state storage for Pulsar Functions

StreamNative Pulsar Functions support **stateful functions** that can maintain state across function invocations. This allows you to build more complex and powerful stream processing applications.

It uses [Oxia](https://github.com/oxia-db/oxia) as a state storage interface.

States are key-value pairs, where a key is a string and its value is arbitrary binary data - counters are stored as 64-bit big-endian binary values. Keys are scoped to an individual function and shared between instances of that function.

To enable state storage for Pulsar Functions, you need to enable it explicitly when creating or updating a function by setting below arguments:

```
--custom-runtime-options '{"enableStateStore": true}'
```

<Note title="Note">
  State storage is only available for **Java** functions for now.
</Note>

## Call state APIs

Pulsar Functions expose below APIs for mutating and accessing `state`.

The following table outlines the states that can be accessed within Java functions.

| State-related API                       | Java                                   |
| --------------------------------------- | -------------------------------------- |
| [Increment counter](#increment-counter) | `incrCounter` <br />`incrCounterAsync` |
| [Retrieve counter](#retrieve-counter)   | `getCounter` <br />`getCounterAsync`   |
| [Update state](#update-state)           | `putState` <br />`putStateAsync`       |
| [Retrieve state](#retrieve-state)       | `getState` <br />`getStateAsync`       |
| [Delete state](#delete-state)           | `deleteState`                          |

## Increment counter

Use `incrCounter` to increment the counter of a given `key` by the given `amount`.
If the `key` does not exist, a new key is created.

```java theme={null}
    /**
     * Increment the built-in distributed counter referred by key
     * @param key The name of the key
     * @param amount The amount to be incremented
     */
    void incrCounter(String key, long amount);
```

To asynchronously increment the counter, you can use `incrCounterAsync`.

```java theme={null}
     /**
     * Increment the built-in distributed counter referred by key
     * but dont wait for the completion of the increment operation
     *
     * @param key The name of the key
     * @param amount The amount to be incremented
     */
    CompletableFuture<Void> incrCounterAsync(String key, long amount);
```

### Retrieve counter

Use `getCounter` to retrieve the counter of a given `key` mutated by `incrCounter`.

```java theme={null}
    /**
     * Retrieve the counter value for the key.
     *
     * @param key name of the key
     * @return the amount of the counter value for this key
     */
    long getCounter(String key);
```

To asynchronously retrieve the counter mutated by `incrCounterAsync`, you can use `getCounterAsync`.

```java theme={null}
     /**
     * Retrieve the counter value for the key, but don't wait
     * for the operation to be completed
     *
     * @param key name of the key
     * @return the amount of the counter value for this key
     */
    CompletableFuture<Long> getCounterAsync(String key);
```

### Update state

Besides the `counter` API, Pulsar also exposes a general key/value API for functions to store and update the state of a given `key`.

```java theme={null}
    /**
     * Update the state value for the key.
     *
     * @param key name of the key
     * @param value state value of the key
     */
    void putState(String key, ByteBuffer value);
```

To asynchronously update the state of a given `key`, you can use `putStateAsync`.

```java theme={null}
    /**
     * Update the state value for the key, but don't wait for the operation to be completed
     *
     * @param key name of the key
     * @param value state value of the key
     */
    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
```

### Retrieve state

Use `getState` to retrieve the state of a given `key`.

```java theme={null}
    /**
     * Retrieve the state value for the key.
     *
     * @param key name of the key
     * @return the state value for the key.
     */
    ByteBuffer getState(String key);
```

To asynchronously retrieve the state of a given `key`, you can use `getStateAsync`.

```java theme={null}
    /**
     * Retrieve the state value for the key, but don't wait for the operation to be completed
     *
     * @param key name of the key
     * @return the state value for the key.
     */
    CompletableFuture<ByteBuffer> getStateAsync(String key);
```

### Delete state

```java theme={null}
    /**
     * Delete the state value for the key.
     *
     * @param key   name of the key
     */
    void deleteState(String key);
```

## Query state via CLI

You can also query function state using CLI commands. This is useful for debugging and monitoring stateful functions.

```bash theme={null}
bin/pulsar-admin functions querystate \
    --tenant <tenant> \
    --namespace <namespace> \
    --name <function-name> \
    --key <state-key> \
    [---watch]
```

If `--watch` is specified, the CLI tool keeps running to get the latest value of the provided `state-key`.

## Example

The example of `WordCountFunction` demonstrates how `state` is stored within Pulsar Functions.

1. The function splits the received `String` into multiple words using regex `\\.`.
2. For each `word`, the function increments `counter` by 1 via `incrCounter(key, amount)`.

   ```java theme={null}
   import org.apache.pulsar.functions.api.Context;
   import org.apache.pulsar.functions.api.Function;

   import java.util.Arrays;

   public class WordCountFunction implements Function<String, Void> {
       @Override
       public Void process(String input, Context context) throws Exception {
           Arrays.asList(input.split("\\.")).forEach(word -> context.incrCounter(word, 1));
           return null;
       }
   }
   ```
