Concurrency of long-running operations

As mentioned in my monthly report, I have started working on adding support for concurrent long-running operations for the 4.0 version.

Here is a short video demonstrating one use case:

Why work on this now?

The tasks that are coming up in this project rely on one crucial architectural decision: how to represent the columnar dependencies of operations at the core of the tool. We need a representation which enables a wide range of use cases, so it's not easy to get it right. A waterfall-like development process could consist in first designing this new metadata format, then migrating all the operations to expose such information, and finally implement all the use cases enabled by that. But of course that's running the risk that the architecture chosen to represent the dependencies does not quite address the desired use cases, and so migrating again a lot of operations to expose more metadata later on.

Instead, I am trying another approach: experiment with introducing just enough metadata for one use case at a time, and see where it takes me. The additional metadata I am introducing is always optional, because we need to cater for the needs of special operations whose dependencies cannot be isolated clearly (such as the transpose operations). This means that most operations can be left as is in this first prototyping phase: they are just treated as black boxes by the execution engine. I am therefore concentrating my testing around a sample of operations.

Multiple types of concurrency

I have identified two types of concurrency worth implementing in the tool:

  • a row-wise concurrency, where the rows are streamed through the pipeline of operations. This is a form of concurrency, because the operations are running simultaneously: the first row of the table can make it through the entire pipeline before the 100th row has even started to be processed by the first operation. This is a form of concurrency implemented in a lot ETL tools, like Knime or Apache NiFi. When the records mode is used, this would becomes a record-wise concurrency.
  • a column-wise concurrency, where operations which work on different columns are executed independently. If you fetch URLs from one column and reconcile another, the two processes should be able to carry on independently.

In my opinion, both types of concurrency would be useful in OpenRefine. The row-wise (or record-wise) concurrency is useful on large datasets, by essentially making it possible to compose your data cleaning workflow by looking at a sample of the data and let the execution follow on the rest of the dataset. But with this concurrency only, the operations you execute towards the end of the pipeline can get held up by any previous slow operations, even though the outcome of those is not actually needed to execute the later operations. So this can lead to unnecessary waits that a columnar analysis can avoid.

For now I have been focusing on the column-wise concurrency, because it relates to those wider reproducibility use cases and is therefore helpful to identify the representation of column dependencies that is needed. But I do plan to implement row/record-wise concurrency as well.

Overview of the dependency representation so far

I have modified the column metadata to record, for each column:

  • at which step in the history the column was last modified (lastModified)
  • what name the column had at this step (originalName)

With those simple ingredients, when starting a long-running operation (that is to say an operation which must persist the data it fetches or computes), we can do the following (roughly):

  • analyze which columns are read by that long-running operation
  • for each of those columns, read from the column metadata the pair (lastModified, originalName)
  • find the earliest position in the history where all those (lastModified, originalName) pairs are present in the column model
  • run the operation from that position (instead of the current one) as soon as the grid at that position is fully computed

When looking for an earlier state in the history from which to compute the operation, we also ensure that the row / record ids are preserved by the operations browsed. This is done thanks to the existing row / record preservation metadata (introduced in the first phase of the project).

For now, it is the responsibility of any transformation deriving a new Grid to update those fields in the new grid. I am anticipating that there should be better ways to ensure that those get updated per the architecture itself. At least for now, if an operation declares it depends only on a certain set of columns, it is only given access to those columns during the computation, so that is already one useful guarantee. I think it should be possible to do something similar the other way around, for modified columns.

Columnar reading of project data

Looking at the turn things are taking, it seems increasingly easy to support columnar reading of the project grid. By this I mean, only reading from disk the columns that are needed for a particular computation. This could potentially even apply to displaying the grid, only reading the columns which are visible onscreen. I would not be surprised if the requirements for such a feature end up being closely aligned with the reproducibility requirements. But because this is only a performance optimization, which does not seem crucial for most of our current users, this is not something I plan to invest a lot of energy into (for now). I just have it on my radar and will keep it in mind in architectural changes.

3 Likes

I have worked on this aspect and have a better architecture to propose now.
I have convinced myself that all of the columnar metadata associated with an operation (representing which columns it reads, writes, deletes or copies) should be enforced by the core platform directly.
This means, if an operation declares that it depends only on certain columns, it will only be able to read data from those columns (which was already the case as mentioned above), but also that it is only able to write data to the columns it declares as such. A corollary of that is that operations don't have to manually mark the columns they modify as such, as it gets handled by the core tool instead.

Going for such an approach (instead of trusting the operations to do what they say) should have multiple benefits:

  • avoid bugs: mismatches between what the operation declares and what it does could mean that it does not execute as expected, for instance if it was allowed to run in parallel of another operation while it should have waited for the results of that operation instead. This is particularly important because extensions are able to define new operations, whose correctness we have no control over.
  • make it easier to implement columnar reading of project data later on. Say we want to read a selection of the columns from a project's current state (for instance the columns that are present in the view, or the columns that are required for facet computation). The project's current state is derived from the initial state by a series of operations. Assuming all of those are row/record wise and declare the columns they depend on and touch, then we are able to compute the total set of columns that should be read to compute all the required operations to generate the required set of columns.
  • make it possible to optimize the computation of rows/records with a pipelining approach. At the moment, when we apply a new row/record wise operation that is computed lazily (such as trimming whitespace or making judgments on reconciled cells), this adds another map operation on top of the existing grid, which will then be executed every time we need to access any data in the project grid. As the user keeps applying more operations, this stack of map operations grows and this can slow down the access to the project data. This slow down can be avoided by caching an intermediate grid in memory or on disk to avoid the slow down, but this also comes at a cost. So the idea behind "pipelining" is to merge many consecutive map operations into just one, which computes the composite function. If operations have clear columnar dependencies then we have an opportunity to optimize this composite function to make it run faster. This is a standard technique that is used in platforms like Spark and that we could try implementing if the need/interest arises. But for now I would say it would be premature optimization.

How to represent the columnar dependencies of OpenRefine operations?

Here is an overview about how I propose to model those. First, as is currently the case, all row/record-wise operations come with an engineConfig which specifies whether the operation is run in rows or records mode, as well as the facets which restrict its execution to a subset of the dataset. On top of that, I am introducing new metadata fields:

Column dependencies

By declaring a list of column names as column dependencies, the operation will be run on rows or records which contain only those columns, in the specified order.

An operation can opt out of this by not declaring this list of column names, in which case it will still be fed with full rows. In that scenario, the operation can potentially read information from any columns. It is useful to let operations still have access to that, even if the operation does not actually depend on all columns: we might just not be able to determine those dependencies statically.

Column insertions / replacements

By declaring a list of column insertions or replacements, an operation is able to isolate the columns that it modifies. Each of those come with:

  • the name of the column
  • a boolean flag indicating whether the column is inserted or replaces another column
  • the name of the column to the right of which the column should be inserted, or if it is a replacement, the name of the column to replace
  • if the column directly obtained by copying another column, the name of that column (otherwise the column is computed by the operation)
  • an optional reconciliation configuration to store in the column metadata

This rather long list of fields is there to make it possible to express most of OpenRefine's current row/record wise operations as a series of such insertions (see the examples below).
When an operation declares insertions / replacements, it is expected to produce rows which only contain those columns (excluding those which are directly copied from other columns). The core of the tool takes care of inserting those values in the full rows by itself.

Note the choice to determine the positions of the inserted columns based on the name of the column it should be inserted after, instead of using column indices. I think this should make for better reproducibility, given how the majority of OpenRefine operations work. Say you came up with a series of operations which let you break down a date represented in a particular format into three columns (day / month / year), inserted after the original column. If you want to reuse this function on another dataset, it is likely that the date column to operate on will not always be at the same position in the table. Therefore it's better to define the position of the new columns to be relative to the original column, rather than using fixed indices. This seems to be a pattern followed by most OpenRefine operations: "add column based on this column", "split column", "add column based on reconciled values" all create new columns after the column they are run on.

Column deletions

In addition to declaring insertions and replacements, an operation can declare a set of column names that it deletes.

Examples

  • Moving a column can be expressed as:

    • dependencies: none
    • insertions/replacements: a single insertion, to put a copy of the column at the new location
    • deletions: the original column
  • Reconciling a column can be expressed as:

    • dependencies: the column being reconciled and any other columns that are used in the reconciliation config
    • insertions/replacement: a single replacement, to replace the unreconciled column by the reconciled one
    • deletions: none
  • Adding column based on this column can be expressed as:

    • dependencies: they are declared if the expression can be analyzed to extract a set of dependent columns. For now, this is only implemented for GREL: Jython/Clojure expressions will always be treated as opaque, relying on all columns. In that case, this will mean that the operation reads all columns.
    • insertions/replacement: a single insertion, after the column it is based on
    • deletions: none
  • Reordering columns:
    The operation currently stores the final positions of the retained columns, which does not make it possible to express it as a series of insertions and deletions, but by making small changes to how the operation is defined we should be able to do so (see the corresponding issue)

  • Data extension:
    It is not a row-wise operation (because it can create additional rows to form a record structure when multi-valued properties are present), so for it falls out of this framework and is therefore treated as an opaque operation (which can read/write arbitrary columns).
    Although the features of declaring column dependencies and insertions is available to operations using the records mode, it is currently only possible for "row-wise transformations applied in records mode", which means that the number of rows is preserved, even though the operation is able to read information from the record containing the row being processed. Record-wise operations in the sense of "mapping one record to another one (of possibly different length)" cannot yet be analyzed in such a fashion. I am not sure if/how to add support for that.

Overview of the current operation hierarchy

The Java classes which implement the Operation interface currently form the hierarchy below (with the "Operation" suffix removed, for readability).
The analysis of columnar dependencies is only available to the operations which extend the RowMapOperation base class. For each of those, I indicate if I was able to make them declare their dependencies and the columns they write to, or indicate why not.

All other operations are still treated as opaque. Some of those could potentially also be analyzed if the model is made flexible enough to accommodate for them.

  • AnnotateOneRow: like the two following ones, they can actually be formulated as row maps, but the operation currently returns additional metadata to enable in-place re-rendering, so that's what was preventing me from reclassifying them as such so far. I think it should be doable though.
  • CellEdit: see above
  • ReconEdit: see above
  • EngineDependent: this is the base class for all operations which respect the row/records mode toggle and the facets
    • BlankDown: this is not a map, because a state is carried between consecutive row/records. But the columnar scope is very clear, so it would be nice to be able to expose that.
    • FillDown: same as above
    • ColumnSplit: the number of columns created depends on the data, so strictly speaking this is not a map. But the columnar action is also clear, so it would be nice to be able to expose that.
    • ExtendData: not captured yet, see the explanation above
    • PerformWikibaseEdits: not captured yet, because there is some non-locality in the way it updates cells reconciled to 'new' with the created items
    • ReconCopyAcrossColumns: it's such a weird operation that we should re-design or drop it
    • RowMap: the base class for all operations which can expose a columnar scope
      • ColumnMove: âś“columnar scope exposed
      • ColumnRemoval: âś“columnar scope exposed
      • ColumnRename: âś“columnar scope exposed
      • ExpressionBased: base class for all operations which rely on evaluating an expression. The isolation of column dependencies is only supported for a subset of GREL expressions
        • ColumnAdditionByFetchingURLs: âś“columnar scope exposed
        • ColumnAddition: âś“columnar scope exposed
        • MassEdit: âś“columnar scope exposed
        • TextTransform: âś“columnar scope exposed
      • ReconClearSimilarCells: âś“columnar scope exposed
      • ReconDiscardJudgments: âś“columnar scope exposed
      • ReconJudgeSimilarCells: âś“columnar scope exposed
      • ReconMarkNewTopics: âś“columnar scope exposed
      • ReconMatchBestCandidates: âś“columnar scope exposed
      • ReconMatchSpecificTopic: âś“columnar scope exposed
      • ReconUseValuesAsIdentifiers: âś“columnar scope exposed
      • Recon: âś“columnar scope exposed
      • RowFlag: not really exposed, because flags and stars don't form actual columns so far (see this post)
      • RowStar: same as above
    • RowRemoval: this is not a row map, but it would still be interesting to expose the facet dependencies for this.
  • KeyValueColumnize: this will remain opaque
  • TransposeColumnsIntoRows: this will remain opaque
  • TransposeRowsIntoColumns: this will remain opaque
  • MultiValuedCellJoin: this could be analyzed if we are able to handle record maps
  • MultiValuedCellSplit: same as above
  • RowReorder: this is not a row map, but it would still be interesting to expose the columnar dependencies
  • SaveWikibaseSchema: the columnar dependencies of the schema could potentially also be exposed

As you can see we already capture a good chunk of the existing operations, including a lot of highly-used ones, but there is still room for more.

Discussion

The model proposed above is an attempt to capture a large part of OpenRefine's operations and render their actions analyzable within this model. It's designed with the following use cases in mind:

  • extract and reapply history (so that we are able to determine the requirements of a particular series of operations about the grid they are applied on and give the user a chance to rename column dependencies if there is a mismatch)
  • concurrent operations (to be able to run operations concurrently if they are not interdependent)
  • visualization of workflows (offering a graph-based representation of the operations and their dependencies, to better understand the structure of the history)
  • deleting / reordering operations in the history
  • the internal optimizations mentioned above (only affecting performance)

It's by no means a definitive model: I hope to expand it to be able to analyze more operations and represent them in a way that makes them generalize correctly in most cases. But maybe you already see some problems with the choices made here.

I have worked on the other type of concurrency mentioned in my first post above: row-wise concurrency.

Here is a reminder of the use case. Say you have a column to reconcile and then want to fetch more columns from the reconciled values. If the reconciliation process is easy enough (with the service reliably auto-matching cells), then you could just want to execute the data extension process directly after reconciliation. OpenRefine should not have to wait until the last cell is reconciled before fetching the new columns on the first few rows.

The crucial difficulty here is that the data extension process should not try to fetch the new columns for rows which are not reconciled yet. It needs to wait for the reconciliation process to output reconciled rows. One can think of it as a pipeline, where each operation is a row-processing device, with pipes connecting an operation to the next one.

Problem statement

First, how do long-running operations currently work in the new architecture? They work by defining a "change data producer", which, for each row or record, produces some data of a given type. This data is written to a dedicated file, with on each line a pair of the row/record id and the fetched data (serialized in JSON). The file is compressed with Gzip and split into multiple partitions. Each partition file is written to disk gradually, as the data is fetched. The new state of the project's grid is obtained by doing a join (in the terminology of relational databases) between the original project grid and the change data, joining the original rows with the fetched data to produce the new rows. Because both the original grid and the change data are sorted by row/record ids and have the same partition boundaries, this join can be computed efficiently using a joint scan of both operands. When executing this join while the change data is still being fetched, missing rows are replaced by placeholder values, which make it possible to display the grid before completion of the fetching process.

Consider the second long-running process (such as the data extension fetching process in this running example). It needs to read the grid it is executed on, but should not be able to read the rows with placeholder values. Instead, iterating on this grid should be done in a blocking way: the process should wait for the next row to be computed before being able to fetch it.

In UNIX terms, this behaviour is quite similar to piping processes together, either with the | symbol in bash or by using named pipes with mkfifo. However, in our case we also need to write the change data to disk (to be able to save the project, to display intermediate states with the Undo/Redo functionality, to be able to recover after a crash, and so on). So we need the second process to read the contents of the change data file that the first process writes to, similar to what the tail -f command does. So I have attempted to implement something that follows the same spirit.

Changes required on the project serialization format

The tail -f command works by listening to changes made to a file using the inotify API. Every time a write is made to this file, the operating system notifies the tail process about it, and it reads the file from where it stopped. Luckily there is a platform-idependent interface to use this in Java, WatchService. So I have attempted to make a prototype based on that.

Compression

One problem is that our change data files are compressed in Gzip, and this compression format does not support resuming reading from an arbitrary point in the file: you always need to read from the start of the file. If the decompressor encounters an unexpected end of file (which we expect to happen in a context where the compressed file is being written to by another process), then there is no simple way to save the state of the decompressor and try reading again when more content is available.

Therefore I had to change the compression format to a compression codec which supports this better. I went for the ZSTD format because it is reasonably established, seems to have good performance and there is an actively maintained library, jstd-jni, which implements this codec with streaming support. The library is used by major projects such as Apache Spark and the like, so I think it's unlikely to go bust (and if it does we should be able to imitate those other major dependencies). I would of course be happy to consider other compression formats.

End marker

When a process reads a change data file and reaches the end of the file, it needs to know whether that's the actual end of the partition or if more data might be written by the upstream operation later on. If the change data file ends abruptly in the middle of a line or in a compression block then it's clear that more data is coming, but it can also be that the file ends cleanly with a complete line, in which case there is no way to tell.

So to cater for this need I introduced an end marker, for now simply "end", added at the end of each partition file. It's unambiguous because all other lines start with the row/record number.

This marker is only there to signify the completion of a partition file: the completion of the entire change data is signified by adding an empty _SUCCESS file in the same directory as the partitions, just like Spark does.

Touching all partition files when starting to write a change data

So far, when writing the data fetched by an operation to disk, we would only start writing a partition file when it starts to get processed. This is a problem for synchronous fetching, because the observable number of partitions in the change data will change over time. The collections we use, PLLs, come with the assumption that their partitions are immutable. To keep this property (which is generally useful), I have decided to make sure that all partition files are touched (meaning, an empty file of the expected filename is created) before any actual writing happens. I think the performance cost associated with this change should only become noticeable with hundreds or thousands of partitions, meaning many millions of rows, so I think that's acceptable.

Changes required in the tool's architecture

Choosing between asynchronous and synchronous iteration

When iterating from a grid, we need to be able to switch between two behaviours:

  • the existing behaviour, which iterates over the entire grid and replaces missing values not computed yet by placeholders (used to display the grid in the web interface, or compute facets for instance). This should not require setting up any WatchService or have any additional overhead compared to what we are doing now.
  • the new behaviour, with a blocking iteration which is waiting for missing values to be computed, using the WatchService

I implemented this by providing an IterationContext to the iterate/compute methods of the PLL interface (see the corresponding change).

Better record grouping

Given a project grid, accessing its records implies deriving a PLL<Record> out of the PLL<IndexedRow> of the rows in the grid. Currently, whenever this record PLL is computed, we run a small task which inspects the records that are on partition boundaries. This has the benefit of making it possible to then iterate from a partition of records simply by iterating on the underlying partition of rows, grouping the rows into records on the fly and not having to look at the next partition for the end of last record.

But if the underlying collection of rows is not fully computed yet, then this does not work: the records at the partition boundaries might evolve after the grid has been fetched entirely. So we will need to make this record grouping lazier: do not run the task when creating the collection, but instead let a process iterating over a given partition of records to also inspect the next partition of rows to finish the last record in the partition.

I have not implemented this approach yet but I think this should overall be an improvement, even in other use cases, are we are sparing ourselves a task which should make accessing records faster overall.

As always, I would welcome feedback on the approach. I do realize that this goes rather deep in the inner workings of the new architecture that basically no-one else than me is familiar with, but perhaps people have thoughts about the high-level approach and about some implementation details such as the reliance on the WatchService and Zstd-JNI.

Overall, although the architectural changes are quite involved, I think this is a change that is worth pursuing because:

  • it will have a big impact on the user experience, significantly reducing the frequency of cases where the user needs to wait for the completion of a long-running operation before carrying on with other operations
  • because the project serialization format is still easy to change as we haven't released this architecture, now is a good time to investigate what is needed to support this.

I made a draft PR which contains the changes I made towards this so far: Support for synchronous iteration from partially computed grids by wetneb · Pull Request #343 · wetneb/OpenRefine · GitHub

What happens if the fetch never completes some rows which are needed to later compute the record groupings? I.E. do our internal timeouts eventually take care of closing the long running operation and subsequent pipeline ? And what state are those unfinished fetched rows? Finally does the WatchService eventually get a signal that “you don’t need to watch these unfinished fetch rows”?

There aren't any timeouts at the moment: as long as the operation keeps running, we'll wait for its results. The WatchService is currently closed whenever the consumer stops iterating over the grid (for instance when the downstream operation completes or is canceled).
Which use cases do you have in mind to introduce such timeouts?

Networking issues. But I guess the lower level HTTP client library we use will default to 3 minutes timeout unless we are overriding it in out config code?

So each fetch I guess will be closed eventually in the case of networking connectivity issues?

Ah, there is no networking happening at all at this level. Each OpenRefine long-running operation is executed by separate thread(s) in the same Java program (OpenRefine itself). Currently there is no communication between operations because they are only run in parallel if they are column-wise independent. The proposed row-wise concurrency would introduce a form of communication between operations that would be entirely file-based: the second operation reads the results of the first operation by reading the files where the first operation writes its data.
That is, there is no networking between operations at all.

Here is a quick demo of what the user experience could look like for row-wise concurrency:

Beyond the scenario of reconciliation followed by data extension presented in the video, I think something like this could be useful in a fairly wide range of scenarios. For instance, the URL-fetching operation is generally very slow if you choose a long wait time between requests, to be polite to the web server. Generally, the HTTP response is processed further, for instance via GREL/Python or reconciliation (see for instance this tutorial). This feature would allow the user to experiment with those downstream processing steps without having to wait for the URL-fetching operation to complete.

After experimenting with it a little, it has become quite clear that the current restriction to row-wise operations is limiting the reach of the feature: records mode support would be really useful. For instance, the data extension operation always runs in records mode, because if the reconciliation service returns multiple values then a record structure is created. I would keep this task on the back burner for now, as I think it's worth validating the basic principle with users first, before investing more time on such a feature, which is not central to the reproducibility topic anyway (it is already an extension of column-based concurrency).

But still, I am quite enthusiastic about the potential effects such a feature can have. Intuitively, it should:

  • make working with slow reconciliation services much less frustrating, because the time it takes for reconciliation to complete isn't such a big deal anymore. Similarly with scraping or working with restrictive APIs.

  • reduce the need to artificially restrict long-running operations on subsets of the dataset to make them tractable. At the moment, this is something I see done very often. The problem is that once you have done many other operations following your original costly operation, there isn't really a good way to relax the original filter and run the whole workflow on a larger part of the dataset:

    • you can fiddle with the JSON representation of the workflow manually (which is pretty error-prone), and even then, you'd be losing all the results of the long-running operation on the subset of the dataset where you have executed it already
    • you can re-run the same steps manually on other parts of the dataset, but for any operation that creates new columns (URL fetching, data extension) you'll have to manually merge the newly-created columns into the ones that were originally created when you first ran the operation on the subset
    • maybe other approaches?

    Instead, with this feature you can just run the operation on the entire dataset, maybe pause it if you want to spare the servers you are hitting, do your downstream processing (evaluating it on the rows that have been already fetched) and later on just resume the original operation to have it process the rest of the dataset, alongside with all of the downstream operations.

  • it brings us closer to the ability of executing entire workflows in a streaming fashion, from the import stage to the export. This would be particularly interesting in a CLI environment, where you'd be able to be able to do something like this (with a purely fictional and non-vetted syntax): cat source_data.csv | refine my_workflow.json > cleaned_data.tsv
    assuming the file my_workflow.json encodes the import metadata, series of operations and export format. If all the operations are row/record wise, then the entire processing could be done in a streaming mode.

1 Like

Pipelining operations, even custom intermediate operations are now supported with JDK 23. Java itself is incorporating more and more Stream / Pipelining / Workflow support directly in it's core. (Apache Beam / Spark windowing operations, and so forth)

@antonin_d This looks really useful, I've read through it fully, and can see how it might help in a few ways, for one, being able to cancel an intermediate operation, as well as parallel operations for those that are parallel-capable, etc.:
https://openjdk.org/jeps/473