ThreadsafeFunction
Threadsafe Function
(opens in a new tab) is a complex concept in Node.js. As we all know, Node.js is single-threaded, so you can't access napi_env
, napi_value
, and napi_ref
on another thread.
napi_env
, napi_value
, and napi_ref
are low level concepts in Node-API
, which the #[napi]
macro of NAPI-RS
is built on top of. NAPI-RS also provides a low level
API to access the original Node-API
.
Node-API
provides complex Threadsafe Function
APIs to call JavaScript functions on other threads. It's very complex, so many developers don't understand how to use it correctly. NAPI-RS provides a limited version of Threadsafe Function
APIs to make it easier to use:
use std::{sync::Arc, thread};
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
#[napi]
pub fn call_threadsafe_function(callback: ThreadsafeFunction<u32>) -> Result<()> {
let tsfn = Arc::new(callback);
for n in 0..100 {
let tsfn = tsfn.clone();
thread::spawn(move || {
tsfn.call(Ok(n), ThreadsafeFunctionCallMode::Blocking);
});
}
Ok(())
}
⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
export function callThreadsafeFunction(
callback: (err: null | Error, result: number) => void,
): void
Return type
The return type of the ThreadsafeFunction
is the same as the return type of the JavaScript callback, you can define the return type in the second generic parameter of ThreadsafeFunction
:
use std::thread;
use napi::threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode};
use napi_derive::napi;
#[napi]
pub fn call_threadsafe_function(callback: ThreadsafeFunction<u32, u32>) {
thread::spawn(move || {
callback.call_with_return_value(Ok(1), ThreadsafeFunctionCallMode::Blocking, |ret, _| {
println!("ret: {:?}", ret); // 101
Ok(())
});
});
}
import { callThreadsafeFunction } from './index.js'
callThreadsafeFunction((err, result) => {
return result + 100
})
CallJsBackArgs
Sometimes the args passed to the ThreadsafeFunction
are not the same as the args passed to the JavaScript callback. You can build the ThreadsafeFunction
from Function
with the CallJsBackArgs
to achieve this:
use std::thread;
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadsafeCallContext, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
struct Data {
name: String,
}
#[napi]
pub fn call_threadsafe_function(callback: Function<String, ()>) -> Result<()> {
let tsfn = callback
.build_threadsafe_function()
.build_callback(|ctx: ThreadsafeCallContext<Data>| Ok(format!("Hello {}", ctx.value.name)))?;
thread::spawn(move || {
tsfn.call(
Data {
name: "John".to_string(),
},
ThreadsafeFunctionCallMode::NonBlocking,
);
});
Ok(())
}
⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
import { callThreadsafeFunction } from './index.js'
callThreadsafeFunction((data) => {
console.log(data) // Hello John
})
Error Status
The error status of the ThreadsafeFunction
is the same as the error status of the JavaScript callback. You can define the error status in the fourth generic parameter of ThreadsafeFunction
:
use std::{sync::Arc, thread};
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
pub struct CustomErrorStatus(String);
impl AsRef<str> for CustomErrorStatus {
fn as_ref(&self) -> &str {
&self.0
}
}
impl From<Status> for CustomErrorStatus {
fn from(value: Status) -> Self {
CustomErrorStatus(value.to_string())
}
}
#[napi]
pub fn call_threadsafe_function(
tsfn: Arc<ThreadsafeFunction<u32, u32, u32, CustomErrorStatus>>,
) -> Result<()> {
for n in 0..100 {
let tsfn = tsfn.clone();
thread::spawn(move || {
tsfn.call(
Err(Error::new(
CustomErrorStatus("Custom".to_owned()),
format!("Custom error: {}", n),
)),
ThreadsafeFunctionCallMode::Blocking,
);
});
}
Ok(())
}
ErrorStrategy
There are two different error-handling strategies for Threadsafe Function
. The strategy can be defined in the fifth generic parameter of ThreadsafeFunction
:
let tsfn: ThreadsafeFunction<u32, u32, u32, false> = ...
CalleeHandled: true
(default behavior)
Err
from Rust code will be passed into the first argument of the JavaScript callback. This behavior follows the async callback conventions from Node.js: https://nodejs.org/en/learn/asynchronous-work/javascript-asynchronous-programming-and-callbacks#handling-errors-in-callbacks (opens in a new tab). Many async APIs in Node.js are designed this way, like fs.read
.
With CalleeHandled: true
, you must call the ThreadsafeFunction
with the Result
type, so that the Error
will be handled and passed back to the JavaScript callback:
use std::{sync::Arc, thread};
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
#[napi]
pub fn call_threadsafe_function(
tsfn: Arc<ThreadsafeFunction<u32, u32, u32, Status, true>>,
) -> Result<()> {
for n in 0..100 {
let tsfn = tsfn.clone();
thread::spawn(move || {
tsfn.call(
Err(Error::new(
Status::GenericFailure,
format!("Error with: {n}"),
)),
ThreadsafeFunctionCallMode::Blocking,
);
});
}
Ok(())
}
⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
import { callThreadsafeFunction } from './index.js'
callThreadsafeFunction((err, result) => {
if (err) {
console.error(err) // [Error: Error with: 0] { code: 'GenericFailure' }
}
console.log(result)
})
CalleeHandled: false
No Error
will be passed back to the JavaScript side. You can use this strategy to avoid the Ok
wrapping on the Rust side if your code will never return Err
.
With this strategy, ThreadsafeFunction
doesn't need to be called with Result<T>
, and the first argument of JavaScript callback is the value from the Rust, not Error | null
.
With the CalleeHandled: false
strategy, the ThreadsafeFunction
will not be
able to handle the error in the Rust threads, so you can't pass the Error
back
to the JavaScript side.
It's only recommended if you are sure the threads where the ThreadsafeFunction
is called will not return Err
or panic.
use std::{sync::Arc, thread};
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
#[napi]
pub fn call_threadsafe_function(
tsfn: Arc<ThreadsafeFunction<u32, (), u32, Status, false>>,
) -> Result<()> {
for n in 0..100 {
let tsfn = tsfn.clone();
thread::spawn(move || {
tsfn.call(n, ThreadsafeFunctionCallMode::Blocking);
});
}
Ok(())
}
⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️ ⬇️
export declare function callThreadsafeFunction(
tsfn: (arg: number) => void,
): void
Weak
ThreadsafeFunction
By default, the ThreadsafeFunction
will cause the event loop on the thread on which it is created to remain alive until the ThreadsafeFunction
is destroyed. See Deciding whether to keep the process running.
If you don't want to keep the Node.js process/event loop alive, you can define the Weak
parameter of ThreadsafeFunction
to true
:
use std::{sync::Arc, thread};
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
#[napi]
pub fn call_threadsafe_function(
tsfn: Arc<ThreadsafeFunction<u32, u32, u32, Status, false, true>>,
) -> Result<()> {
for n in 0..100 {
let tsfn = tsfn.clone();
thread::spawn(move || {
tsfn.call(n, ThreadsafeFunctionCallMode::Blocking);
});
}
Ok(())
}
If you call this function like this:
import { callThreadsafeFunction } from './index.js'
// log nothing because the event loop exit immediately
callThreadsafeFunction((err, n) => {
if (err) {
console.error(err)
} else {
console.log(n)
}
})
There won't be any logs in the console, because the event loop and Node.js process exit immediately.
MaxQueueSize
You can set the MaxQueueSize
parameter of ThreadsafeFunction
to limit the number of messages in the queue.
If call the ThreadsafeFunction
with the Blocking
mode, the MaxQueueSize
parameter will have no effect. Blocking
mode would block the queue when the queue is full. NonBlocking
mode would return immediately with the Status::QueueFull
when the queue is full. See napi_call_threadsafe_function
for more details.
use std::{sync::Arc, thread};
use napi::{
bindgen_prelude::*,
threadsafe_function::{ThreadsafeFunction, ThreadsafeFunctionCallMode},
};
use napi_derive::napi;
#[napi]
pub fn call_threadsafe_function(
tsfn: Arc<ThreadsafeFunction<u32, (), u32, Status, false, false, 1>>,
) -> Result<()> {
thread::spawn(move || {
for n in 0..100 {
let tsfn = tsfn.clone();
let status = tsfn.call(n, ThreadsafeFunctionCallMode::NonBlocking);
println!("{}", status)
}
});
Ok(())
}
When you call this function, and add heavy work in the callback, you will see the QueueFull
status return from the tsfn.call
:
import { callThreadsafeFunction } from './index.js'
function fib(n) {
if (n <= 1) return n
return fib(n - 1) + fib(n - 2)
}
callThreadsafeFunction(() => {
fib(40)
})
this would produce the following output:
Ok
Ok
QueueFull
QueueFull
QueueFull
QueueFull
QueueFull
QueueFull
QueueFull
QueueFull
...