OpenSearch doesn’t support external versioning in Update request as of now. If you use HTTP command similar to below, you will get exception mentioned there.
index1/_update/1?version=123&version_type=external { "doc": {"title": "updated title"}, }
Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use if_seq_no and if_primary_term instead
seq_no: Counter which gets incremented every time write operation happens in the index.
primary_term: It keeps track of current primary replica. Primary replica can get changed during failover & new replica number gets persisted in the cluster state as “primary_term” field.
These two fields are good for optimistic concurrency control. You can get the latest “seq_no” & “primary_term” when you fetch a document from OpenSearch. Then while updating, you can pass the same values in “if_seq_no” and “if_primary_term” fields of query parameter (e.g. index1/_update/1?primary_term=2&seq_no=30). OpenSearch will validate these fields while updating the document. If there is a mismatch, that means some other write operation has happened or primary replica got changed in the meantime. The update operation would fail in that case.
Problem:
But I have a different problem to solve altogether. We have consumers which read event messages from a queue & store the messages in OpenSearch. But that queue doesn’t give any ordering guarantee. That means if same document gets updated twice almost at same time, consumer might get the old update event after the new one. That can potentially update stale value in document field. To avoid that, we need to reject any update with previous timestamp than what already exists in OpenSearch document. This can’t be handled with “if_seq_no” and “if_primary_term” fields. We need external version support for that. But sadly OpenSearch doesn’t support that yet in update request.
Workaround Solution:
The workaround would be to write a stored script in OpenSearch. Here is a sample Painless script that can be used.
POST _scripts/update-with-ext-version
{
"script": {
"lang": "painless",
"source": "if (params.updatedAt < ctx._source.updatedAt) {throw new Exception();} for (entry in params.entrySet()) {ctx._source[entry.getKey()] = entry.getValue();}"
}
}
“updatedAt” field contains a long value (Epoch time in milliseconds). The script checks whether “updatedAt” value sent in update request payload is less than the value present in existing OpenSearch document. The script will fail the update operation if that is true. Otherwise update operation will succeed. Only thing is that documents should contain “updatedAt” field to apply this validation. You can tweak the script as per your use case. But I hope the basic idea is clear now. The HTTP request would look similar to below:
index1/_update/1 { "script": { "id": "update-with-ext-version", "params": { "title": "updated title" } } }
If we want to transform above scripted update request in Java code, it would look similar to below:
import java.io.IOException;
import java.util.Map;
import org.opensearch.client.json.JsonData;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.OpenSearchException;
import org.opensearch.client.opensearch._types.Script;
import org.opensearch.client.opensearch._types.StoredScriptId;
import org.opensearch.client.opensearch.core.UpdateRequest;
import org.opensearch.client.opensearch.core.UpdateResponse;
public class OpenSearchServiceImpl {
private OpenSearchClient client = OpenSearchClientFactory.getInstance();
public void doScriptedUpdate() throws OpenSearchException, IOException {
String index = "index1";
String docId = "1";
// scripted update document
StoredScriptId storedScript = new StoredScriptId.Builder().id("update-with-ext-version")
.params("title", JsonData.of("updated title")).build();
UpdateRequest<Map, Map> updateRequest = new UpdateRequest.Builder<Map, Map>().index(index).id(docId)
.script(new Script.Builder().stored(storedScript).build()).build();
UpdateResponse<Map> updateResponse = client.update(updateRequest, Map.class);
System.out.println("scripted update: " + updateResponse.result().name() + " " + updateResponse.version());
}
}