Handling data received from SpatialOS
It is up to each worker to decide how the basic event loop should be structured, depending on its
requirements. The worker should call improbable.worker.Connection.getOpList
to get a list of
“operations” (for example a new entity, or a component update) from SpatialOS that have been sent since the
last time the function was called. This improbable.worker.OpList
then needs to be passed to a
dispatcher in order to invoke user-provided callbacks.
The following snippet shows a (very) simple example implementation of an event loop that processes operations from SpatialOS 60 times per second:
private static final int FRAMES_PER_SECOND = 60;
private static void runEventLoop(Connection connection, Dispatcher dispatcher) {
java.time.Duration maxWait = java.time.Duration.ofMillis(Math.round(1000.0 / FRAMES_PER_SECOND));
while (true) {
long startTime = System.nanoTime();
OpList opList = connection.getOpList(0 /* non-blocking */);
// Invoke callbacks.
dispatcher.process(opList);
// Do other work here...
long stopTime = System.nanoTime();
java.time.Duration waitFor = maxWait.minusNanos(stopTime - startTime);
try {
Thread.sleep(Math.max(waitFor.toMillis(), 0));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
If positive timeout (in milliseconds) is provided to getOpList
, the function will block until
there is at least one operation to return, or until the timout has been exceeded.
Note that all callbacks provided to the
improbable.worker.Dispatcher
will be invoked only when improbable.worker.Dispatcher.process
is called, and only on the thread that is currently calling
improbable.worker.Dispatcher.process
: the user has complete control over callback timing and
threading decisions.
If the connection fails (when Connection.isConnected() == false
), any error messages explaining the cause
will be returned by getOpList(). Make sure to process this list in order to receive all
available diagnostic information.
Dispatcher callbacks
Several kinds of callbacks can be registered on the improbable.worker.Dispatcher
. Each method takes a
Callback<Param>
, where the parameter type Param
depends on the particular kind of callback being
registered. The following table has the details:
Method | Parameter type (and fields) | Invoked when… |
---|---|---|
onDisconnect |
improbable.worker.Ops.Disconnect (String reason) |
the Connection is no longer connected and can no longer be used. Check the log for errors relating to the disconnection. |
onFlagUpdate |
improbable.worker.Ops.FlagUpdate (String name, improbable.collections.Option<String> value ) |
a worker flag has been created or deleted, or its value has changed. |
onLogMessage |
improbable.worker.Ops.LogMessage (improbable.worker.LogLevel level , String message ) |
the SDK issues a log message for the worker to print. This does not include messages sent using Connection.sendLogMessage . |
onMetrics |
improbable.worker.Ops.Metrics (improbable.worker.Metrics metrics ) |
the SDK reports built-in internal metrics. |
onCriticalSection |
improbable.worker.Ops.CriticalSection (bool inCriticalSection ) |
a critical section is about to be entered or has just been left. |
onAddEntity |
improbable.worker.Ops.AddEntity (improbable.worker.EntityId entityId ) |
an entity is added to the worker’s view of the simulation. |
onRemoveEntity |
improbable.worker.Ops.RemoveEntity (improbable.worker.EntityId entityId ) |
an entity is removed from the worker’s view of the simulation. |
onReserveEntityIdResponse |
improbable.worker.Ops.ReserveEntityIdResponse (improbable.worker.RequestId<ReserveEntityIdRequest> requestId , improbable.worker.StatusCode statusCode , String message , improbable.collections.Option<improbable.worker.EntityId> entityId ) |
the worker has received a response for an entity ID reservation it had requested previously. |
onReserveEntityIdsResponse |
improbable.worker.Ops.ReserveEntityIdsResponse (improbable.worker.RequestId<ReserveEntityIdsRequest> requestId , improbable.worker.StatusCode statusCode , String message , improbable.collections.Option<improbable.worker.EntityId> entityId , int numberOfEntityIds ) |
the worker has received a response for a reservation of an entity ID range it had requested previously. |
onCreateEntityResponse |
improbable.worker.Ops.CreateEntityResponse (improbable.worker.RequestId<CreateEntityRequest> requestId , improbable.worker.StatusCode statusCode , String message , improbable.collections.Option<improbable.worker.EntityId> entityId ) |
the worker has received a response for an entity creation it had requested previously. |
onDeleteEntityResponse |
improbable.worker.Ops.DeleteEntityResponse (improbable.worker.RequestId<DeleteEntityRequest> requestId , improbable.worker.EntityId entityId , improbable.worker.StatusCode statusCode , String message ) |
the worker has received a response for an entity deletion it had requested previously. |
onEntityQueryResponse |
improbable.worker.Ops.EntityQueryResponse (improbable.worker.RequestId<EntityQueryRequest> requestId , improbable.worker.StatusCode statusCode , String message , int resultCount , Map<improbable.worker.EntityId, improbable.worker.Entity> result ) |
the worker has received a response for an entity query it had requested previously. |
onAddComponent<C> |
improbable.worker.Ops.AddComponent<C> (improbable.worker.EntityId entityId , improbable.worker.IComponentData<C> data ) |
a component is added to an existing entity in the worker’s view of the simulation. |
onRemoveComponent<C> |
improbable.worker.Ops.RemoveComponent (improbable.worker.EntityId entityId ) |
a component is removed from an existing entity in the worker’s view of the simulation. |
onAuthorityChange<C> |
improbable.worker.Ops.AuthorityChange (improbable.worker.EntityId entityId , improbable.worker.Authority authority ) |
the worker’s authority state over a component is changed to Authoritative, Not Authoritative or Authority Loss Imminent. |
onComponentUpdate<C> |
improbable.worker.Ops.ComponentUpdate<C> (improbable.worker.EntityId entityId , improbable.worker.IComponentUpdate<C> update ) |
a component for an entity in the worker’s view of the simulation has been updated. |
onCommandRequest<C> |
improbable.worker.Ops.CommandRequest<C> (improbable.worker.RequestId<IncomingCommandRequest> requestId , improbable.worker.EntityId entityId , int timeoutMillis , String callerWorkerId , List<String> callerAttributeSet , improbable.worker.CommandRequest<C> request ) |
the worker has received a command request for a component on an entity over which it has write access authority. |
onCommandResponse<C> |
improbable.worker.Ops.CommandResponse<C> (improbable.worker.RequestId<OutgoingCommandRequest> requestId , improbable.worker.EntityId entityId , improbable.worker.StatusCode statusCode , improbable.collections.Option<improbable.worker.CommandResponse<C>> response ) |
the worker has received a command response for a request it issued previously. |
Here’s an example of registering callbacks:
private static java.util.List<Long> registerCallbacks(Connection connection, Dispatcher dispatcher) {
java.util.List<Long> callbackKeys = new java.util.ArrayList<Long>();
long callbackKey = dispatcher.onAddEntity(addEntity
-> {
// Do something with addEntity.entityId.
});
callbackKeys.add(callbackKey);
callbackKey = dispatcher.onComponentUpdate(Switch.COMPONENT, op -> {
Switch.Update update = op.update;
for (SwitchToggled toggleEvent : update.getToggled()) {
System.out.println("Switch has been toggled at " + toggleEvent.getTime());
}
});
callbackKeys.add(callbackKey);
return callbackKeys;
}
If you want to unregister a callback, call remove()
with the long
returned from the registration method:
private static void unregisterCallbacks(Dispatcher dispatcher, java.util.List<Long> callbackKeys) {
for (Long key : callbackKeys) {
dispatcher.remove(key);
}
}
Using the View
improbable.worker.View
is subclass of improbable.worker.Dispatcher
, which adds the functionality of
automatically maintaining the current state of the worker’s view of the simulated world. It has a field
called entities
, which is a map from entity ID to improbable.worker.Entity
objects. When a
View
processes an OpList
, it automatically updates the state of this map
as appropriate. Any user-defined callbacks are also invoked, as per the Dispatcher
contract.
Dispatcher invocation order
improbable.worker.OpList
objects that the improbable.worker.Dispatcher
receives are handled in
the order these objects are received over the connection. The next object in the OpList
is handled
only after the previous object is processed. For a given object in the OpList
, user-defined
callbacks are processed in the order in which they were registered with the Dispatcher
.
The exception to this rule are the callbacks pertaining to operations that come in “start” and “end” pairs. When an “end” operation is received, callbacks are processed in the reverse order of their registration. This means that pairs of callbacks registered for the “start” and “end” operations can correctly depend on resources managed by pairs of callbacks registered previously, similar to the usual construction and destruction order found in, for example, C++.
For example, Ops.AddEntity
and Ops.AddComponent
callbacks are invoked in the order they were
registered with the improbable.worker.Dispatcher
, while Ops.RemoveEntity
and Ops.RemoveComponent
callbacks are invoked in reverse order. Similarly, Ops.AuthorityChange
callbacks are invoked in the
order they were registered when write access authority is granted, but in reverse order when write access authority is revoked
(or authority loss is imminent). Ops.CriticalSection
s are invoked in usual order when a critical
section is entered but reverse order when a critical section is left. The Ops.FlagUpdate
callbacks
are invoked in the order they were registered when a worker flag is created or its value is changed,
but in reverse order when a worker flag is deleted.
In particular, this invocation order has the side-effect that the final state of a component
removed in an Ops.RemoveComponent
callback can still be inspected in the improbable.worker.View
.
Critical sections
The protocol supports the concept of critical sections. A critical section is a block of operations that should be processed atomically by the worker. For example, a set of entities representing a stack of cubes should be added to the scene in its entirety before any physics simulation is done. Note that this notion of a critical section does not provide any guarantees.
The Ops.CriticalSection
callback is invoked with inCriticalSection == true
just before
the first operation of the critical section is delivered, and is invoked again with
inCriticalSection == false
just after the last operation of the critical section is delivered.
Worker flags
To access the value of a worker flag, use the
getWorkerFlag
method of a Connection
:
Option<String> getWorkerFlag(String flagName)
For example:
private static final double DEFAULT_WORK_SPEED = 10;
private static void getSpeedWorkerFlag(Connection connection) {
improbable.collections.Option<String> speedFlag = connection.getWorkerFlag("mycompany_theproject_speed");
if (speedFlag.isPresent()) {
setWorkSpeed(Double.parseDouble(speedFlag.get()));
} else {
setWorkSpeed(DEFAULT_WORK_SPEED);
}
}