diff --git a/.semaphore/post_install.yml b/.semaphore/post_install.yml index 30ae30bc..6c7a83a4 100644 --- a/.semaphore/post_install.yml +++ b/.semaphore/post_install.yml @@ -2,7 +2,7 @@ version: v1.0 name: post-install agent: machine: - type: s1-prod-ubuntu22-04-amd64-1 + type: s1-prod-ubuntu24-04-amd64-1 global_job_config: prologue: @@ -26,7 +26,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-1 + type: s1-prod-ubuntu24-04-amd64-1 prologue: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' @@ -45,7 +45,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-arm64-1 + type: s1-prod-ubuntu24-04-arm64-1 prologue: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' @@ -64,7 +64,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-1 + type: s1-prod-ubuntu24-04-amd64-1 prologue: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' @@ -81,7 +81,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-arm64-1 + type: s1-prod-ubuntu24-04-arm64-1 prologue: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' @@ -98,7 +98,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-1 + type: s1-prod-ubuntu24-04-amd64-1 prologue: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' @@ -117,7 +117,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-arm64-1 + type: s1-prod-ubuntu24-04-arm64-1 prologue: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' diff --git a/.semaphore/project.yml b/.semaphore/project.yml deleted file mode 100644 index 7bef71a4..00000000 --- a/.semaphore/project.yml +++ /dev/null @@ -1,44 +0,0 @@ -# This file is managed by ServiceBot plugin - Semaphore. The content in this file is created using a common -# template and configurations in service.yml. -# Modifications in this file will be overwritten by generated content in the nightly run. -# For more information, please refer to the page: -# https://confluentinc.atlassian.net/wiki/spaces/Foundations/pages/2871296194/Add+SemaphoreCI -apiVersion: v1alpha -kind: Project -metadata: - name: confluent-kafka-javascript - description: "" -spec: - visibility: private - repository: - url: git@github.com:confluentinc/confluent-kafka-javascript.git - run_on: - - branches - - tags - - pull_requests - pipeline_file: .semaphore/semaphore.yml - integration_type: github_app - status: - pipeline_files: - - path: .semaphore/semaphore.yml - level: pipeline - whitelist: - branches: - - master - - main - - /^v\d+\.\d+\.x$/ - - /^gh-readonly-queue.*/ - custom_permissions: true - debug_permissions: - - empty - - default_branch - - non_default_branch - - pull_request - - forked_pull_request - - tag - attach_permissions: - - default_branch - - non_default_branch - - pull_request - - forked_pull_request - - tag diff --git a/.semaphore/project_public.yml b/.semaphore/project_public.yml deleted file mode 100644 index fb83ce58..00000000 --- a/.semaphore/project_public.yml +++ /dev/null @@ -1,25 +0,0 @@ -# This file is managed by ServiceBot plugin - Semaphore. The content in this file is created using a common -# template and configurations in service.yml. -# Modifications in this file will be overwritten by generated content in the nightly run. -# For more information, please refer to the page: -# https://confluentinc.atlassian.net/wiki/spaces/Foundations/pages/2871296194/Add+SemaphoreCI -apiVersion: v1alpha -kind: Project -metadata: - name: confluent-kafka-javascript - description: "" -spec: - visibility: public - repository: - url: git@github.com:confluentinc/confluent-kafka-javascript.git - run_on: - - forked_pull_requests - pipeline_file: .semaphore/semaphore.yml - integration_type: github_app - status: - pipeline_files: - - path: .semaphore/semaphore.yml - level: pipeline - forked_pull_requests: - allowed_contributors: - - "ConfluentSemaphore" diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index f6562c16..d25079c9 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -8,7 +8,7 @@ version: v1.0 name: build-test-release agent: machine: - type: s1-prod-ubuntu22-04-amd64-1 + type: s1-prod-ubuntu24-04-amd64-1 auto_cancel: running: @@ -38,7 +38,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-2 + type: s1-prod-ubuntu24-04-amd64-2 prologue: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' @@ -52,7 +52,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-arm64-1 + type: s1-prod-ubuntu24-04-arm64-1 prologue: commands: - '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY' @@ -66,7 +66,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-arm64-1 + type: s1-prod-ubuntu24-04-arm64-1 jobs: - name: "Build from source and test" commands: @@ -92,7 +92,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-2 + type: s1-prod-ubuntu24-04-amd64-2 prologue: commands: - export CKJS_LINKING=dynamic @@ -121,7 +121,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-2 + type: s1-prod-ubuntu24-04-amd64-2 prologue: commands: - npm install @@ -142,7 +142,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-3 + type: s1-prod-ubuntu24-04-amd64-3 env_vars: - name: TARGET_PRODUCE_PERFORMANCE value: "35" @@ -180,7 +180,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-2 + type: s1-prod-ubuntu24-04-amd64-2 env_vars: - name: ARCHITECTURE value: "x64" @@ -216,7 +216,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-arm64-1 + type: s1-prod-ubuntu24-04-arm64-1 env_vars: - name: ARCHITECTURE value: "arm64" @@ -252,7 +252,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-1 + type: s1-prod-ubuntu24-04-amd64-1 env_vars: - name: ARCHITECTURE value: "x64" @@ -288,7 +288,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-arm64-1 + type: s1-prod-ubuntu24-04-arm64-1 env_vars: - name: ARCHITECTURE value: "arm64" @@ -460,7 +460,7 @@ blocks: task: agent: machine: - type: s1-prod-ubuntu22-04-amd64-2 + type: s1-prod-ubuntu24-04-amd64-2 jobs: - name: "Tarball" commands: @@ -472,7 +472,7 @@ after_pipeline: task: agent: machine: - type: s1-prod-ubuntu20-04-amd64-1 + type: s1-prod-ubuntu24-04-amd64-1 jobs: - name: SonarQube commands: diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c74eb98..95275824 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,14 @@ +# confluent-kafka-javascript v1.3.1 + +v1.3.1 is a maintenance release. It is supported for all usage. + +## Fixes + +1. Avoid a race condition that causes 100% usage of a CPU core when + consuming with `partitionsConsumedConcurrently > 1` and all messages + are consumed (#300) + + # confluent-kafka-javascript v1.3.0 v1.3.0 is a feature release. It is supported for all usage. @@ -6,6 +17,13 @@ v1.3.0 is a feature release. It is supported for all usage. 1. References librdkafka v2.10.0. Refer to the [librdkafka v2.10.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.10.0) for more information. +## Fixes + +1. Support Protobuf oneof fields in Data Contract rules (#261) +2. Ensure use of DEK object is thread-safe (#268) +3. Fix token fetch to return after successful request (#278) +4. Add loose OAuth authorization (#289) + # confluent-kafka-javascript v1.2.0 diff --git a/lib/kafkajs/_consumer.js b/lib/kafkajs/_consumer.js index c6410547..8de46c6e 100644 --- a/lib/kafkajs/_consumer.js +++ b/lib/kafkajs/_consumer.js @@ -24,7 +24,6 @@ const { const { Buffer } = require('buffer'); const MessageCache = require('./_consumer_cache'); const { hrtime } = require('process'); -const { LinkedList } = require('./_linked-list'); const ConsumerState = Object.freeze({ INIT: 0, @@ -203,11 +202,10 @@ class Consumer { * It's set to null when no fetch is in progress. */ #fetchInProgress; - /** - * List of DeferredPromises waiting on consumer queue to be non-empty. + * Are we waiting for the queue to be non-empty? */ - #queueWaiters = new LinkedList(); + #nonEmpty = null; /** * Whether any rebalance callback is in progress. @@ -363,7 +361,6 @@ class Consumer { */ async #rebalanceCallback(err, assignment) { const isLost = this.#internalClient.assignmentLost(); - this.#rebalanceCbInProgress = new DeferredPromise(); let assignmentFnCalled = false; this.#logger.info( `Received rebalance event with message: '${err.message}' and ${assignment.length} partition(s), isLost: ${isLost}`, @@ -468,7 +465,7 @@ class Consumer { */ const workersToSpawn = Math.max(1, Math.min(this.#concurrency, this.#partitionCount)); if (workersToSpawn !== this.#workers.length) { - this.#workerTerminationScheduled.resolve(); + this.#resolveWorkerTerminationScheduled(); /* We don't need to await the workers here. We are OK if the termination and respawning * occurs later, since even if we have a few more or few less workers for a while, it's * not a big deal. */ @@ -639,11 +636,14 @@ class Consumer { /* Certain properties that the user has set are overridden. We use trampolines to accommodate the user's callbacks. * TODO: add trampoline method for offset commit callback. */ rdKafkaConfig['offset_commit_cb'] = true; - rdKafkaConfig['rebalance_cb'] = (err, assignment) => this.#rebalanceCallback(err, assignment).catch(e => + rdKafkaConfig['rebalance_cb'] = (err, assignment) => { + this.#rebalanceCbInProgress = new DeferredPromise(); + this.#rebalanceCallback(err, assignment).catch(e => { if (this.#logger) this.#logger.error(`Error from rebalance callback: ${e.stack}`); }); + }; /* We handle offset storage within the promisified API by ourselves. Thus we don't allow the user to change this * setting and set it to false. */ @@ -904,6 +904,7 @@ class Consumer { const returnPayload = { batch, _stale: false, + _seeked: false, _lastResolvedOffset: { offset: -1, leaderEpoch: -1 }, heartbeat: async () => { /* no op */ }, pause: this.pause.bind(this, [{ topic, partitions: [partition] }]), @@ -922,9 +923,25 @@ class Consumer { async #fetchAndResolveWith(takeFromCache, size) { if (this.#fetchInProgress) { + await this.#fetchInProgress; + /* Restart with the checks as we might have + * a new fetch in progress already. */ + return null; + } + + if (this.#nonEmpty) { + await this.#nonEmpty; + /* Restart with the checks as we might have + * a new fetch in progress already. */ return null; } + if (this.#workerTerminationScheduled.resolved) { + /* Return without fetching. */ + return null; + } + + let err, messages, processedRebalance = false; try { this.#fetchInProgress = new DeferredPromise(); const fetchResult = new DeferredPromise(); @@ -933,8 +950,9 @@ class Consumer { this.#internalClient.consume(size, (err, messages) => fetchResult.resolve([err, messages])); - let [err, messages] = await fetchResult; + [err, messages] = await fetchResult; if (this.#rebalanceCbInProgress) { + processedRebalance = true; await this.#rebalanceCbInProgress; this.#rebalanceCbInProgress = null; } @@ -956,6 +974,8 @@ class Consumer { } finally { this.#fetchInProgress.resolve(); this.#fetchInProgress = null; + if (!err && !processedRebalance && this.#messageCache.assignedSize === 0) + this.#nonEmpty = new DeferredPromise(); } } @@ -973,10 +993,13 @@ class Consumer { } /* It's possible that we get msg = null, but that's because partitionConcurrency - * exceeds the number of partitions containing messages. So in this case, - * we should not call for new fetches, rather, try to focus on what we have left. + * exceeds the number of partitions containing messages. So + * we should wait for a new partition to be available. */ if (!msg && this.#messageCache.assignedSize !== 0) { + await this.#messageCache.availablePartitions(); + /* Restart with the checks as we might have + * the cache full. */ return null; } @@ -1000,10 +1023,13 @@ class Consumer { } /* It's possible that we get msgs = null, but that's because partitionConcurrency - * exceeds the number of partitions containing messages. So in this case, - * we should not call for new fetches, rather, try to focus on what we have left. + * exceeds the number of partitions containing messages. So + * we should wait for a new partition to be available. */ if (!msgs && this.#messageCache.assignedSize !== 0) { + await this.#messageCache.availablePartitions(); + /* Restart with the checks as we might have + * the cache full. */ return null; } @@ -1316,7 +1342,7 @@ class Consumer { /* If any message is unprocessed, either due to an error or due to the user not marking it processed, we must seek * back to get it so it can be reprocessed. */ - if (lastOffsetProcessed.offset !== lastOffset) { + if (!payload._seeked && lastOffsetProcessed.offset !== lastOffset) { const offsetToSeekTo = lastOffsetProcessed.offset === -1 ? firstMessage.offset : (lastOffsetProcessed.offset + 1); const leaderEpoch = lastOffsetProcessed.offset === -1 ? firstMessage.leaderEpoch : lastOffsetProcessed.leaderEpoch; this.seek({ @@ -1348,36 +1374,27 @@ class Consumer { return ppc; } - #queueNonEmptyCb() { - for (const waiter of this.#queueWaiters) { - waiter.resolve(); + #notifyNonEmpty() { + if (this.#nonEmpty) { + this.#nonEmpty.resolve(); + this.#nonEmpty = null; } + if (this.#messageCache) + this.#messageCache.notifyAvailablePartitions(); } - async #nextFetchRetry() { - if (this.#fetchInProgress) { - await this.#fetchInProgress; - } else { - /* Backoff a little. If m is null, we might be without messages - * or in available partition starvation, and calling consumeSingleCached - * in a tight loop will help no one. - * In case there is any message in the queue, we'll be woken up before the - * timer expires. - * We have a per-worker promise, otherwise we end up awakening - * other workers when they've already looped and just restarted awaiting. - * The `Promise` passed to `Timer.withTimeout` cannot be reused - * in next call to this method, to avoid memory leaks caused - * by `Promise.race`. */ - const waiter = new DeferredPromise(); - const waiterNode = this.#queueWaiters.addLast(waiter); - await Timer.withTimeout(1000, waiter); - - /* Resolves the "extra" promise that has been spawned when creating the timer. */ - waiter.resolve(); - this.#queueWaiters.remove(waiterNode); - } - } + #queueNonEmptyCb() { + const nonEmptyAction = async () => { + if (this.#fetchInProgress) + await this.#fetchInProgress; + this.#notifyNonEmpty(); + }; + nonEmptyAction().catch((e) => { + this.#logger.error(`Error in queueNonEmptyCb: ${e}`, + this.#createConsumerBindingMessageMetadata()); + }); + } /** * Starts a worker to fetch messages/batches from the internal consumer and process them. * @@ -1393,27 +1410,24 @@ class Consumer { */ async #worker(config, perMessageProcessor, fetcher) { let ppc = null; - while (!this.#workerTerminationScheduled.resolved) { + try { + const ms = await fetcher(ppc); + if (!ms) + continue; - const ms = await fetcher(ppc).catch(e => { + if (this.#pendingOperations.length) { + ppc = this.#discardMessages(ms, ppc); + break; + } + + ppc = await perMessageProcessor(ms, config); + } catch (e) { /* Since this error cannot be exposed to the user in the current situation, just log and retry. * This is due to restartOnFailure being set to always true. */ if (this.#logger) this.#logger.error(`Consumer encountered error while consuming. Retrying. Error details: ${e} : ${e.stack}`, this.#createConsumerBindingMessageMetadata()); - }); - - if (this.#pendingOperations.length) { - ppc = this.#discardMessages(ms, ppc); - break; - } - - if (!ms) { - await this.#nextFetchRetry(); - continue; } - - ppc = await perMessageProcessor(ms, config); } if (ppc) @@ -1447,19 +1461,32 @@ class Consumer { * @private */ async #cacheExpirationLoop() { + const cacheExpirationInterval = BigInt(this.#cacheExpirationTimeoutMs * 1e6); + const maxFetchInterval = BigInt(1000 * 1e6); while (!this.#workerTerminationScheduled.resolved) { let now = hrtime.bigint(); - const cacheExpiration = this.#lastFetchClockNs + - BigInt(this.#cacheExpirationTimeoutMs * 1e6); + const cacheExpirationTimeout = this.#lastFetchClockNs + + cacheExpirationInterval; + const maxFetchTimeout = this.#lastFetchClockNs + + maxFetchInterval; - if (now > cacheExpiration) { + if (now > cacheExpirationTimeout) { this.#addPendingOperation(() => this.#clearCacheAndResetPositions()); await this.#checkMaxPollIntervalNotExceeded(now); break; } + if (now > maxFetchTimeout) { + /* We need to continue fetching even when we're + * not getting any messages, for example when all partitions are + * paused. */ + this.#notifyNonEmpty(); + } - let interval = Number(cacheExpiration - now) / 1e6; + const awakeTime = maxFetchTimeout < cacheExpirationTimeout ? + maxFetchTimeout : cacheExpirationTimeout; + + let interval = Number(awakeTime - now) / 1e6; if (interval < 100) interval = 100; await Timer.withTimeout(interval, this.#maxPollIntervalRestart); @@ -1481,6 +1508,13 @@ class Consumer { this.#pendingOperations = []; } + #resolveWorkerTerminationScheduled() { + if (this.#workerTerminationScheduled) { + this.#workerTerminationScheduled.resolve(); + this.#queueNonEmptyCb(); + } + } + /** * Internal polling loop. * Spawns and awaits workers until disconnect is initiated. @@ -1662,7 +1696,7 @@ class Consumer { #addPendingOperation(fun) { if (this.#pendingOperations.length === 0) { - this.#workerTerminationScheduled.resolve(); + this.#resolveWorkerTerminationScheduled(); } this.#pendingOperations.push(fun); } @@ -1727,11 +1761,15 @@ class Consumer { } } - #markBatchPayloadsStale(topicPartitions) { + #markBatchPayloadsStale(topicPartitions, isSeek) { for (const topicPartition of topicPartitions) { const key = partitionKey(topicPartition); - if (this.#topicPartitionToBatchPayload.has(key)) - this.#topicPartitionToBatchPayload.get(key)._stale = true; + if (this.#topicPartitionToBatchPayload.has(key)) { + const payload = this.#topicPartitionToBatchPayload.get(key); + payload._stale = true; + if (isSeek) + payload._seeked = true; + } } } @@ -1757,7 +1795,7 @@ class Consumer { } } if (seekOffsets.length) { - await this.#seekInternal(seekOffsets, false); + await this.#seekInternal(seekOffsets); } } @@ -1801,7 +1839,7 @@ class Consumer { } /* If anyone's using eachBatch, mark the batch as stale. */ - this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset]); + this.#markBatchPayloadsStale([rdKafkaTopicPartitionOffset], true); this.#addPendingOperation(() => this.#seekInternal([rdKafkaTopicPartitionOffset])); @@ -2010,7 +2048,7 @@ class Consumer { } this.#disconnectStarted = true; - this.#workerTerminationScheduled.resolve(); + this.#resolveWorkerTerminationScheduled(); this.#logger.debug("Signalling disconnection attempt to workers", this.#createConsumerBindingMessageMetadata()); await this.#lock.write(async () => { diff --git a/lib/kafkajs/_consumer_cache.js b/lib/kafkajs/_consumer_cache.js index f0ad8a00..9047688e 100644 --- a/lib/kafkajs/_consumer_cache.js +++ b/lib/kafkajs/_consumer_cache.js @@ -1,5 +1,6 @@ const { partitionKey, + DeferredPromise, } = require('./_common'); const { LinkedList } = require('./_linked-list'); @@ -74,6 +75,8 @@ class MessageCache { /* LinkedList of assigned partitions. */ #assignedPartitions; + /* Promise that is resolved when there are available partitions. */ + #availablePartitionsPromise = new DeferredPromise(); constructor(logger) { this.logger = logger ?? console; @@ -130,6 +133,7 @@ class MessageCache { cache = new PerPartitionMessageCache(key); this.#tpToPpc.set(key, cache); cache._node = this.#availablePartitions.addLast(cache); + this.notifyAvailablePartitions(); } cache._add(message); } @@ -197,6 +201,7 @@ class MessageCache { this.#assignedPartitions.remove(ppc._node); ppc._node = this.#availablePartitions.addLast(ppc); ppc._assigned = false; + this.notifyAvailablePartitions(); } } @@ -260,6 +265,21 @@ class MessageCache { } this.#reinit(); } + + /** + * Notifies awaiters that there are available partitions to take. + */ + notifyAvailablePartitions() { + this.#availablePartitionsPromise.resolve(); + this.#availablePartitionsPromise = new DeferredPromise(); + } + + /** + * Promise that resolved when there are available partitions to take. + */ + async availablePartitions() { + return this.#availablePartitionsPromise; + } } module.exports = MessageCache; diff --git a/package-lock.json b/package-lock.json index d21e34a5..c6b3cf80 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1999,6 +1999,7 @@ "version": "2.2.3", "resolved": "https://registry.npmjs.org/@bufbuild/protobuf/-/protobuf-2.2.3.tgz", "integrity": "sha512-tFQoXHJdkEOSwj5tRIZSPNUuXK3RaR7T1nUrPgbYX1pUbvqqaaZAsfo+NXBPsz5rZMSKVFrgK1WL8Q/MSLvprg==", + "dev": true, "license": "(Apache-2.0 AND BSD-3-Clause)" }, "node_modules/@bufbuild/protoc-gen-es": { @@ -11554,13 +11555,14 @@ }, "schemaregistry": { "name": "@confluentinc/schemaregistry", - "version": "1.3.0", + "version": "1.3.1", "license": "MIT", "dependencies": { "@aws-sdk/client-kms": "^3.637.0", "@aws-sdk/credential-providers": "^3.637.0", "@azure/identity": "^4.4.1", "@azure/keyvault-keys": "^4.8.0", + "@bufbuild/cel": "^0.1.0", "@bufbuild/protobuf": "^2.0.0", "@criteria/json-schema": "^0.10.0", "@criteria/json-schema-validation": "^0.10.0", @@ -11580,6 +11582,7 @@ "lru-cache": "^10.4.3", "node-vault": "^0.10.2", "simple-oauth2": "^5.1.0", + "uuid": "^11.0.0", "validator": "^13.12.0" }, "devDependencies": { @@ -11600,8 +11603,7 @@ "node-gyp": "^9.3.1", "ts-jest": "^29.2.4", "typescript": "^5.5.4", - "typescript-eslint": "^8.2.0", - "uuid": "^10.0.0" + "typescript-eslint": "^8.2.0" } }, "schemaregistry-examples": { @@ -11616,6 +11618,33 @@ "uuid": "^10.0.0" } }, + "schemaregistry/node_modules/@bufbuild/cel": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/@bufbuild/cel/-/cel-0.1.0.tgz", + "integrity": "sha512-lQJnvQhGLIysYHmXm0rGijlpCH+siGKLkp379mgYXBgyfvf4PHoA2MyVdCED4tdBdsHhQ0sVTkd+8YmvUcKb6w==", + "license": "Apache-2.0", + "dependencies": { + "@bufbuild/cel-spec": "0.1.0" + }, + "peerDependencies": { + "@bufbuild/protobuf": "^2.2.5" + } + }, + "schemaregistry/node_modules/@bufbuild/cel-spec": { + "version": "0.1.0", + "resolved": "https://registry.npmjs.org/@bufbuild/cel-spec/-/cel-spec-0.1.0.tgz", + "integrity": "sha512-c2wC734n9UIJ2MsNj48YgO0siUkF/j4bHiSXZcK+cU1Qwo1Z8lKYTRwjK5Mm6dvOV2MnkWj4FZRN59VvbpFQag==", + "license": "Apache-2.0", + "peerDependencies": { + "@bufbuild/protobuf": "^2.2.5" + } + }, + "schemaregistry/node_modules/@bufbuild/protobuf": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/@bufbuild/protobuf/-/protobuf-2.3.0.tgz", + "integrity": "sha512-WK6zH4MtBp/uesX8KGCnwDDRVnEVHUvwjsigKXcSR57Oo8Oyv1vRS9qyUlSP+6KWRl5z8tNAU5qpf3QodeVYxA==", + "license": "(Apache-2.0 AND BSD-3-Clause)" + }, "schemaregistry/node_modules/ajv": { "version": "8.17.1", "resolved": "https://registry.npmjs.org/ajv/-/ajv-8.17.1.tgz", @@ -11637,6 +11666,18 @@ "resolved": "https://registry.npmjs.org/json-schema-traverse/-/json-schema-traverse-1.0.0.tgz", "integrity": "sha512-NM8/P9n3XjXhIZn1lLhkFaACTOURQXjWhV4BA/RnOv8xvgqtqpAX9IO4mRQxSx1Rlo4tqzeqb0sOlruaOy3dug==", "license": "MIT" + }, + "schemaregistry/node_modules/uuid": { + "version": "11.1.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-11.1.0.tgz", + "integrity": "sha512-0/A9rDy9P7cJ+8w1c9WD9V//9Wj15Ce2MPz8Ri6032usz+NfePxx5AcN3bN+r6ZL6jEo066/yNYB3tn4pQEx+A==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/esm/bin/uuid" + } } } } diff --git a/schemaregistry/e2e/schemaregistry-client.spec.ts b/schemaregistry/e2e/schemaregistry-client.spec.ts index 2d7dfcad..913a2735 100644 --- a/schemaregistry/e2e/schemaregistry-client.spec.ts +++ b/schemaregistry/e2e/schemaregistry-client.spec.ts @@ -129,6 +129,7 @@ describe('SchemaRegistryClient Integration Test', () => { const schemaMetadata: SchemaMetadata = { id: registerResponse?.id!, + guid: registerResponse?.guid!, version: schemaVersion, schema: schemaInfo.schema, subject: testSubject, @@ -137,6 +138,7 @@ describe('SchemaRegistryClient Integration Test', () => { const schemaMetadata2: SchemaMetadata = { id: registerResponse2?.id!, + guid: registerResponse2?.guid!, version: registerResponse2?.version!, schema: backwardCompatibleSchemaInfo.schema, subject: testSubject, diff --git a/schemaregistry/mock-schemaregistry-client.ts b/schemaregistry/mock-schemaregistry-client.ts index 51eb850b..222c7a28 100644 --- a/schemaregistry/mock-schemaregistry-client.ts +++ b/schemaregistry/mock-schemaregistry-client.ts @@ -8,8 +8,10 @@ import { ServerConfig } from './schemaregistry-client'; import stringify from "json-stringify-deterministic"; +import {v4} from "uuid"; import {ClientConfig} from "./rest-service"; import {RestError} from "./rest-error"; +import {SchemaId} from "./serde/serde"; interface VersionCacheEntry { version: number; @@ -45,6 +47,7 @@ class MockClient implements Client { private clientConfig?: ClientConfig; private infoToSchemaCache: Map; private idToSchemaCache: Map; + private guidToSchemaCache: Map; private schemaToVersionCache: Map; private configCache: Map; private counter: Counter; @@ -53,6 +56,7 @@ class MockClient implements Client { this.clientConfig = config this.infoToSchemaCache = new Map(); this.idToSchemaCache = new Map(); + this.guidToSchemaCache = new Map(); this.schemaToVersionCache = new Map(); this.configCache = new Map(); this.counter = new Counter(); @@ -67,7 +71,7 @@ class MockClient implements Client { if (!metadata) { throw new RestError("Failed to register schema", 422, 42200); } - return metadata.id; + return metadata.id!; } async registerFullResponse(subject: string, schema: SchemaInfo, normalize: boolean = false): Promise { @@ -78,20 +82,19 @@ class MockClient implements Client { return cacheEntry.metadata; } - const id = await this.getIDFromRegistry(subject, schema); - if (id === -1) { + const schemaId = await this.getIDFromRegistry(subject, schema); + if (schemaId.id === -1) { throw new RestError("Failed to retrieve schema ID from registry", 422, 42200); } - const metadata: SchemaMetadata = { ...schema, id }; + const metadata: SchemaMetadata = { id: schemaId.id!, guid: schemaId.guid!, ...schema }; this.infoToSchemaCache.set(cacheKey, { metadata, softDeleted: false }); return metadata; } - private async getIDFromRegistry(subject: string, schema: SchemaInfo): Promise { + private async getIDFromRegistry(subject: string, schema: SchemaInfo): Promise { let id = -1; - for (const [key, value] of this.idToSchemaCache.entries()) { const parsedKey = JSON.parse(key); if (parsedKey.subject === subject && this.schemasEqual(value.info, schema)) { @@ -100,14 +103,24 @@ class MockClient implements Client { } } + let guid = ""; + for (const [key, value] of this.guidToSchemaCache.entries()) { + if (this.schemasEqual(value.info, schema)) { + guid = key; + break; + } + } + await this.generateVersion(subject, schema); if (id < 0) { id = this.counter.increment(); const idCacheKey = stringify({ subject, id }); this.idToSchemaCache.set(idCacheKey, { info: schema, softDeleted: false }); + guid = v4() + this.guidToSchemaCache.set(guid, { info: schema, softDeleted: false }); } - return id; + return new SchemaId("", id, guid); } private async generateVersion(subject: string, schema: SchemaInfo): Promise { @@ -134,13 +147,27 @@ class MockClient implements Client { return cacheEntry.info; } + async getByGuid(guid: string, format?: string): Promise { + const cacheEntry = this.guidToSchemaCache.get(guid); + + if (!cacheEntry || cacheEntry.softDeleted) { + throw new RestError("Schema not found", 404, 40400); + } + return cacheEntry.info; + } + async getId(subject: string, schema: SchemaInfo): Promise { + const metadata = await this.getIdFullResponse(subject, schema); + return metadata.id!; + } + + async getIdFullResponse(subject: string, schema: SchemaInfo): Promise { const cacheKey = stringify({ subject, schema: minimize(schema) }); const cacheEntry = this.infoToSchemaCache.get(cacheKey); if (!cacheEntry || cacheEntry.softDeleted) { throw new RestError("Schema not found", 404, 40400); } - return cacheEntry.metadata.id; + return cacheEntry.metadata; } async getLatestSchemaMetadata(subject: string, format?: string): Promise { @@ -158,6 +185,7 @@ class MockClient implements Client { const parsedKey = JSON.parse(key); if (parsedKey.subject === subject && value.version === version) { json = parsedKey; + break } } @@ -170,14 +198,26 @@ class MockClient implements Client { const parsedKey = JSON.parse(key); if (parsedKey.subject === subject && value.info.schema === json.schema.schema) { id = parsedKey.id; + break } } if (id === -1) { throw new RestError("Schema not found", 404, 40400); } + let guid: string = ""; + for (const [key, value] of this.guidToSchemaCache.entries()) { + if (value.info.schema === json.schema.schema) { + guid = key + break + } + } + if (guid === "") { + throw new RestError("Schema not found", 404, 40400); + } return { id, + guid, version, subject, ...json.schema, @@ -226,6 +266,7 @@ class MockClient implements Client { const parsedKey = JSON.parse(key); if (parsedKey.subject === subject && value.info.schema === latest.schema) { id = parsedKey.id; + break } } if (id === -1) { diff --git a/schemaregistry/package.json b/schemaregistry/package.json index 513d424c..cef2d8ac 100644 --- a/schemaregistry/package.json +++ b/schemaregistry/package.json @@ -1,6 +1,6 @@ { "name": "@confluentinc/schemaregistry", - "version": "1.3.0", + "version": "1.3.1", "description": "Node.js client for Confluent Schema Registry", "main": "dist/index.js", "types": "dist/index.d.ts", @@ -26,14 +26,14 @@ "node-gyp": "^9.3.1", "ts-jest": "^29.2.4", "typescript": "^5.5.4", - "typescript-eslint": "^8.2.0", - "uuid": "^10.0.0" + "typescript-eslint": "^8.2.0" }, "dependencies": { "@aws-sdk/client-kms": "^3.637.0", "@aws-sdk/credential-providers": "^3.637.0", "@azure/identity": "^4.4.1", "@azure/keyvault-keys": "^4.8.0", + "@bufbuild/cel": "^0.1.0", "@bufbuild/protobuf": "^2.0.0", "@criteria/json-schema": "^0.10.0", "@criteria/json-schema-validation": "^0.10.0", @@ -53,6 +53,7 @@ "lru-cache": "^10.4.3", "node-vault": "^0.10.2", "simple-oauth2": "^5.1.0", + "uuid": "^11.0.0", "validator": "^13.12.0" }, "scripts": { diff --git a/schemaregistry/rules/cel/cel-executor.ts b/schemaregistry/rules/cel/cel-executor.ts new file mode 100644 index 00000000..9b8b8040 --- /dev/null +++ b/schemaregistry/rules/cel/cel-executor.ts @@ -0,0 +1,87 @@ +import {RuleRegistry} from "../../serde/rule-registry"; +import {RuleContext, RuleExecutor} from "../../serde/serde"; +import {ClientConfig} from "../../rest-service"; +import stringify from "json-stringify-deterministic"; +import {LRUCache} from "lru-cache"; +import {createEnv} from "@bufbuild/cel"; +import {createRegistry} from "@bufbuild/protobuf"; + +export class CelExecutor implements RuleExecutor { + config: Map | null = null + env = createEnv("", createRegistry()); + cache: LRUCache = new LRUCache({max: 1000}) + + static register(): CelExecutor { + const executor = new CelExecutor() + RuleRegistry.registerRuleExecutor(executor) + return executor + } + + configure(clientConfig: ClientConfig, config: Map) { + this.config = config + } + + type(): string { + return "CEL" + } + + async transform(ctx: RuleContext, msg: any): Promise { + const args = { + message: msg + } + return await this.execute(ctx, msg, args) + } + + async execute(ctx: RuleContext, msg: any, args: { [key: string]: any }): Promise { + let expr = ctx.rule.expr + if (expr == null) { + return msg + } + const index = expr.indexOf(';') + if (index >= 0) { + const guard = expr.substring(0, index) + if (guard.trim().length != 0) { + const guardResult = await this.executeRule(ctx, guard, msg, args) + if (guardResult === false) { + // skip the expr + if (ctx.rule.kind === 'CONDITION') { + return true + } + return msg + } + } + expr = expr.substring(index + 1) + } + return await this.executeRule(ctx, expr, msg, args) + } + + async executeRule(ctx: RuleContext, expr: string, obj: any, args: { [key: string]: any }): Promise { + const schema = ctx.target.schema + const scriptType = ctx.target.schemaType + const rule: RuleWithArgs = { + rule: expr, + scriptType: scriptType, + schema: schema + } + const ruleJson = stringify(rule) + let program = this.cache.get(ruleJson) + if (program == null) { + const parsedExpr = this.env.parse(expr) + program = this.env.plan(parsedExpr) + this.cache.set(ruleJson, program) + } + for (const [key, value] of Object.entries(args)) { + this.env.set(key, value) + } + return this.env.eval(program) + } + + async close(): Promise { + } +} + +interface RuleWithArgs { + rule?: string + scriptType?: string + schema?: string +} diff --git a/schemaregistry/rules/cel/cel-field-executor.ts b/schemaregistry/rules/cel/cel-field-executor.ts new file mode 100644 index 00000000..3f3b9fb2 --- /dev/null +++ b/schemaregistry/rules/cel/cel-field-executor.ts @@ -0,0 +1,60 @@ +import {RuleRegistry} from "../../serde/rule-registry"; +import { + FieldContext, + FieldRuleExecutor, + FieldTransform, + RuleContext, +} from "../../serde/serde"; +import {ClientConfig} from "../../rest-service"; +import {CelExecutor} from "./cel-executor"; + +export class CelFieldExecutor extends FieldRuleExecutor { + executor: CelExecutor = new CelExecutor() + + static register(): CelFieldExecutor { + const executor = new CelFieldExecutor() + RuleRegistry.registerRuleExecutor(executor) + return executor + } + + configure(clientConfig: ClientConfig, config: Map) { + this.config = config + } + + type(): string { + return "CEL_FIELD" + } + + override newTransform(ctx: RuleContext): FieldTransform { + return new CelFieldExecutorTransform(this.executor) + } + + async close(): Promise { + } +} + +export class CelFieldExecutorTransform implements FieldTransform { + private executor: CelExecutor + + constructor(executor: CelExecutor) { + this.executor = executor + } + + async transform(ctx: RuleContext, fieldCtx: FieldContext, fieldValue: any): Promise { + if (fieldValue == null) { + return null + } + if (!fieldCtx.isPrimitive()) { + return fieldValue + } + const args = { + value: fieldValue, + fullName: fieldCtx.fullName, + name: fieldCtx.name, + typeName: fieldCtx.typeName(), + tags: Array.from(fieldCtx.tags), + message: fieldCtx.containingMessage + } + return await this.executor.execute(ctx, fieldValue, args) + } +} diff --git a/schemaregistry/rules/encryption/encrypt-executor.ts b/schemaregistry/rules/encryption/encrypt-executor.ts index 1143bc51..da96734d 100644 --- a/schemaregistry/rules/encryption/encrypt-executor.ts +++ b/schemaregistry/rules/encryption/encrypt-executor.ts @@ -3,7 +3,7 @@ import { FieldRuleExecutor, FieldTransform, FieldType, - MAGIC_BYTE, + MAGIC_BYTE_V0, RuleContext, RuleError, } from "../../serde/serde"; @@ -93,18 +93,20 @@ export class FieldEncryptionExecutor extends FieldRuleExecutor { } if (this.config != null) { - for (let [key, value] of config) { - let v = this.config.get(key) - if (v != null) { - if (v !== value) { - throw new RuleError('rule config key already set: {key}') + if (config != null) { + for (let [key, value] of config) { + let v = this.config.get(key) + if (v != null) { + if (v !== value) { + throw new RuleError('rule config key already set: {key}') + } + } else { + this.config.set(key, value) } - } else { - this.config.set(key, value) } } } else { - this.config = config + this.config = config != null ? config : new Map() } } @@ -536,15 +538,15 @@ export class FieldEncryptionExecutorTransform implements FieldTransform { prefixVersion(version: number, ciphertext: Buffer): Buffer { const versionBuf = Buffer.alloc(4) versionBuf.writeInt32BE(version) - return Buffer.concat([MAGIC_BYTE, versionBuf, ciphertext]) + return Buffer.concat([MAGIC_BYTE_V0, versionBuf, ciphertext]) } extractVersion(ciphertext: Buffer): number | null { let magicByte = ciphertext.subarray(0, 1) - if (!magicByte.equals(MAGIC_BYTE)) { + if (!magicByte.equals(MAGIC_BYTE_V0)) { throw new RuleError( `Message encoded with magic byte ${JSON.stringify(magicByte)}, expected ${JSON.stringify( - MAGIC_BYTE, + MAGIC_BYTE_V0, )}`, ) } diff --git a/schemaregistry/schemaregistry-client.ts b/schemaregistry/schemaregistry-client.ts index e323cbbc..aeeccf9e 100644 --- a/schemaregistry/schemaregistry-client.ts +++ b/schemaregistry/schemaregistry-client.ts @@ -81,7 +81,8 @@ export function minimize(info: SchemaInfo): SchemaInfo { * SchemaMetadata extends SchemaInfo with additional metadata */ export interface SchemaMetadata extends SchemaInfo { - id: number; + id?: number; + guid?: string; subject?: string; version?: number; } @@ -141,7 +142,9 @@ export interface Client { register(subject: string, schema: SchemaInfo, normalize: boolean): Promise; registerFullResponse(subject: string, schema: SchemaInfo, normalize: boolean): Promise; getBySubjectAndId(subject: string, id: number, format?: string): Promise; + getByGuid(guid: string, format?: string): Promise; getId(subject: string, schema: SchemaInfo, normalize: boolean): Promise; + getIdFullResponse(subject: string, schema: SchemaInfo, normalize: boolean): Promise; getLatestSchemaMetadata(subject: string, format?: string): Promise; getSchemaMetadata(subject: string, version: number, deleted: boolean, format?: string): Promise; getLatestWithMetadata(subject: string, metadata: { [key: string]: string }, @@ -174,8 +177,8 @@ export class SchemaRegistryClient implements Client { private clientConfig: ClientConfig; private restService: RestService; - private schemaToIdCache: LRUCache; private idToSchemaInfoCache: LRUCache; + private guidToSchemaInfoCache: LRUCache; private infoToSchemaCache: LRUCache; private latestToSchemaCache: LRUCache; private schemaToVersionCache: LRUCache; @@ -184,6 +187,7 @@ export class SchemaRegistryClient implements Client { private schemaToIdMutex: Mutex; private idToSchemaInfoMutex: Mutex; + private guidToSchemaInfoMutex: Mutex; private infoToSchemaMutex: Mutex; private latestToSchemaMutex: Mutex; private schemaToVersionMutex: Mutex; @@ -205,8 +209,8 @@ export class SchemaRegistryClient implements Client { config.basicAuthCredentials, config.bearerAuthCredentials, config.maxRetries, config.retriesWaitMs, config.retriesMaxWaitMs); - this.schemaToIdCache = new LRUCache(cacheOptions); this.idToSchemaInfoCache = new LRUCache(cacheOptions); + this.guidToSchemaInfoCache = new LRUCache(cacheOptions); this.infoToSchemaCache = new LRUCache(cacheOptions); this.latestToSchemaCache = new LRUCache(cacheOptions); this.schemaToVersionCache = new LRUCache(cacheOptions); @@ -214,6 +218,7 @@ export class SchemaRegistryClient implements Client { this.metadataToSchemaCache = new LRUCache(cacheOptions); this.schemaToIdMutex = new Mutex(); this.idToSchemaInfoMutex = new Mutex(); + this.guidToSchemaInfoMutex = new Mutex(); this.infoToSchemaMutex = new Mutex(); this.latestToSchemaMutex = new Mutex(); this.schemaToVersionMutex = new Mutex(); @@ -242,7 +247,7 @@ export class SchemaRegistryClient implements Client { async register(subject: string, schema: SchemaInfo, normalize: boolean = false): Promise { const metadataResult = await this.registerFullResponse(subject, schema, normalize); - return metadataResult.id; + return metadataResult.id!; } /** @@ -299,6 +304,29 @@ export class SchemaRegistryClient implements Client { }); } + /** + * Get a schema by GUID. + * @param guid - The schema GUID. + * @param format - The format of the schema. + */ + async getByGuid(guid: string, format?: string): Promise { + return await this.guidToSchemaInfoMutex.runExclusive(async () => { + const cachedSchema: SchemaInfo | undefined = this.guidToSchemaInfoCache.get(guid); + if (cachedSchema) { + return cachedSchema; + } + + let formatStr = format != null ? `?format=${format}` : ''; + + const response: AxiosResponse = await this.restService.handleRequest( + `/schemas/guids/${guid}${formatStr}`, + 'GET' + ); + this.guidToSchemaInfoCache.set(guid, response.data); + return response.data; + }); + } + /** * Get the ID for a schema. * @param subject - The subject under which the schema is registered. @@ -306,12 +334,28 @@ export class SchemaRegistryClient implements Client { * @param normalize - Whether to normalize the schema before getting the ID. */ async getId(subject: string, schema: SchemaInfo, normalize: boolean = false): Promise { + const metadataResult = await this.getIdFullResponse(subject, schema, normalize); + + return metadataResult.id!; + } + + /** + * Get the ID for a schema. + * @param subject - The subject under which the schema is registered. + * @param schema - The schema whose ID to get. + * @param normalize - Whether to normalize the schema before getting the ID. + */ + async getIdFullResponse(subject: string, schema: SchemaInfo, normalize: boolean = false): Promise { const cacheKey = stringify({ subject, schema: minimize(schema) }); return await this.schemaToIdMutex.runExclusive(async () => { - const cachedId: number | undefined = this.schemaToIdCache.get(cacheKey); - if (cachedId) { - return cachedId; + const cachedSchemaMetadata: SchemaMetadata | undefined = this.infoToSchemaCache.get(cacheKey); + if (cachedSchemaMetadata) { + // Allow the schema to be looked up again if version is not valid + // This is for backward compatibility with versions before CP 8.0 + if (cachedSchemaMetadata.version != null && cachedSchemaMetadata.version > 0) { + return cachedSchemaMetadata; + } } subject = encodeURIComponent(subject); @@ -321,8 +365,8 @@ export class SchemaRegistryClient implements Client { 'POST', schema ); - this.schemaToIdCache.set(cacheKey, response.data.id); - return response.data.id; + this.infoToSchemaCache.set(cacheKey, response.data); + return response.data; }); } @@ -716,8 +760,8 @@ export class SchemaRegistryClient implements Client { * Clear all caches. */ clearCaches(): void { - this.schemaToIdCache.clear(); this.idToSchemaInfoCache.clear(); + this.guidToSchemaInfoCache.clear(); this.infoToSchemaCache.clear(); this.latestToSchemaCache.clear(); this.schemaToVersionCache.clear(); @@ -761,6 +805,12 @@ export class SchemaRegistryClient implements Client { }); } + async addToGuidToSchemaInfoCache(guid: string, schema: SchemaInfo): Promise { + await this.guidToSchemaInfoMutex.runExclusive(async () => { + this.guidToSchemaInfoCache.set(guid, schema); + }); + } + async getInfoToSchemaCacheSize(): Promise { return await this.infoToSchemaMutex.runExclusive(async () => { return this.infoToSchemaCache.size; @@ -784,4 +834,10 @@ export class SchemaRegistryClient implements Client { return this.idToSchemaInfoCache.size; }); } + + async getGuidToSchemaInfoCacheSize(): Promise { + return await this.guidToSchemaInfoMutex.runExclusive(async () => { + return this.guidToSchemaInfoCache.size; + }); + } } diff --git a/schemaregistry/serde/avro.ts b/schemaregistry/serde/avro.ts index a0c9218f..3d47a9d5 100644 --- a/schemaregistry/serde/avro.ts +++ b/schemaregistry/serde/avro.ts @@ -3,7 +3,7 @@ import { FieldTransform, FieldType, Migration, RefResolver, RuleConditionError, - RuleContext, SerdeType, + RuleContext, SchemaId, SerdeType, Serializer, SerializerConfig } from "./serde"; import { @@ -20,6 +20,9 @@ import Field = types.Field import { LRUCache } from 'lru-cache' import {RuleRegistry} from "./rule-registry"; import stringify from "json-stringify-deterministic"; +import {IHeaders} from "../../types/kafkajs"; + +export const AVRO_TYPE = "AVRO" type TypeHook = (schema: avro.Schema, opts: ForSchemaOptions) => Type | undefined @@ -62,8 +65,9 @@ export class AvroSerializer extends Serializer implements AvroSerde { * serialize is used to serialize a message using Avro. * @param topic - the topic to serialize the message for * @param msg - the message to serialize + * @param headers - optional headers */ - override async serialize(topic: string, msg: any): Promise { + override async serialize(topic: string, msg: any, headers?: IHeaders): Promise { if (this.client == null) { throw new Error('client is not initialized') } @@ -82,7 +86,7 @@ export class AvroSerializer extends Serializer implements AvroSerde { schema: JSON.stringify(avroSchema), } } - const [id, info] = await this.getId(topic, msg, schema) + const [schemaId, info] = await this.getSchemaId(AVRO_TYPE, topic, msg, schema) let avroType: avro.Type let deps: Map [avroType, deps] = await this.toType(info) @@ -90,7 +94,7 @@ export class AvroSerializer extends Serializer implements AvroSerde { msg = await this.executeRules( subject, topic, RuleMode.WRITE, null, info, msg, getInlineTags(info, deps)) const msgBytes = avroType.toBuffer(msg) - return this.writeBytes(id, msgBytes) + return this.serializeSchemaId(topic, msgBytes, schemaId, headers) } async fieldTransform(ctx: RuleContext, fieldTransform: FieldTransform, msg: any): Promise { @@ -164,7 +168,13 @@ export class AvroDeserializer extends Deserializer implements AvroSerde { } } - override async deserialize(topic: string, payload: Buffer): Promise { + /** + * Deserializes a message. + * @param topic - the topic + * @param payload - the message payload + * @param headers - optional headers + */ + override async deserialize(topic: string, payload: Buffer, headers?: IHeaders): Promise { if (!Buffer.isBuffer(payload)) { throw new Error('Invalid buffer') } @@ -172,7 +182,9 @@ export class AvroDeserializer extends Deserializer implements AvroSerde { return null } - const info = await this.getSchema(topic, payload) + const schemaId = new SchemaId(AVRO_TYPE) + const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers) + payload = payload.subarray(bytesRead) const subject = this.subjectName(topic, info) const readerMeta = await this.getReaderSchema(subject) let migrations: Migration[] = [] @@ -182,7 +194,7 @@ export class AvroDeserializer extends Deserializer implements AvroSerde { const [writer, deps] = await this.toType(info) let msg: any - const msgBytes = payload.subarray(5) + const msgBytes = payload if (migrations.length > 0) { msg = writer.fromBuffer(msgBytes) msg = await this.executeMigrations(migrations, subject, topic, msg) diff --git a/schemaregistry/serde/json.ts b/schemaregistry/serde/json.ts index d91037ad..860a70f3 100644 --- a/schemaregistry/serde/json.ts +++ b/schemaregistry/serde/json.ts @@ -2,7 +2,7 @@ import { Deserializer, DeserializerConfig, FieldTransform, FieldType, Migration, RefResolver, RuleConditionError, - RuleContext, + RuleContext, SchemaId, SerdeType, SerializationError, Serializer, SerializerConfig } from "./serde"; @@ -30,6 +30,9 @@ import { LRUCache } from "lru-cache"; import { generateSchema } from "./json-util"; import {RuleRegistry} from "./rule-registry"; import stringify from "json-stringify-deterministic"; +import {IHeaders} from "../../types/kafkajs"; + +export const JSON_TYPE = "JSON" export interface ValidateFunction { (this: any, data: any): boolean @@ -82,8 +85,9 @@ export class JsonSerializer extends Serializer implements JsonSerde { * Serializes a message. * @param topic - the topic * @param msg - the message + * @param headers - optional headers */ - override async serialize(topic: string, msg: any): Promise { + override async serialize(topic: string, msg: any, headers?: IHeaders): Promise { if (this.client == null) { throw new Error('client is not initialized') } @@ -102,7 +106,7 @@ export class JsonSerializer extends Serializer implements JsonSerde { schema: JSON.stringify(jsonSchema), } } - const [id, info] = await this.getId(topic, msg, schema) + const [schemaId, info] = await this.getSchemaId(JSON_TYPE, topic, msg, schema) const subject = this.subjectName(topic, info) msg = await this.executeRules(subject, topic, RuleMode.WRITE, null, info, msg, null) const msgBytes = Buffer.from(JSON.stringify(msg)) @@ -112,7 +116,7 @@ export class JsonSerializer extends Serializer implements JsonSerde { throw new SerializationError('Invalid message') } } - return this.writeBytes(id, msgBytes) + return this.serializeSchemaId(topic, msgBytes, schemaId, headers) } async fieldTransform(ctx: RuleContext, fieldTransform: FieldTransform, msg: any): Promise { @@ -180,8 +184,9 @@ export class JsonDeserializer extends Deserializer implements JsonSerde { * Deserializes a message. * @param topic - the topic * @param payload - the message payload + * @param headers - optional headers */ - override async deserialize(topic: string, payload: Buffer): Promise { + override async deserialize(topic: string, payload: Buffer, headers?: IHeaders): Promise { if (!Buffer.isBuffer(payload)) { throw new Error('Invalid buffer') } @@ -189,14 +194,16 @@ export class JsonDeserializer extends Deserializer implements JsonSerde { return null } - const info = await this.getSchema(topic, payload) + const schemaId = new SchemaId(JSON_TYPE) + const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers) + payload = payload.subarray(bytesRead) const subject = this.subjectName(topic, info) const readerMeta = await this.getReaderSchema(subject) let migrations: Migration[] = [] if (readerMeta != null) { migrations = await this.getMigrations(subject, info, readerMeta) } - const msgBytes = payload.subarray(5) + const msgBytes = payload let msg = JSON.parse(msgBytes.toString()) if (migrations.length > 0) { msg = await this.executeMigrations(migrations, subject, topic, msg) @@ -207,7 +214,7 @@ export class JsonDeserializer extends Deserializer implements JsonSerde { } else { target = info } - msg = this.executeRules(subject, topic, RuleMode.READ, null, target, msg, null) + msg = await this.executeRules(subject, topic, RuleMode.READ, null, target, msg, null) if ((this.conf as JsonSerdeConfig).validate) { const validate = await this.toValidateFunction(info) if (validate != null && !validate(JSON.parse(msg))) { diff --git a/schemaregistry/serde/protobuf.ts b/schemaregistry/serde/protobuf.ts index 9dbcd19a..57977e42 100644 --- a/schemaregistry/serde/protobuf.ts +++ b/schemaregistry/serde/protobuf.ts @@ -3,7 +3,7 @@ import { DeserializerConfig, FieldTransform, FieldType, RuleConditionError, - RuleContext, + RuleContext, SchemaId, SerdeType, SerializationError, Serializer, SerializerConfig @@ -38,7 +38,6 @@ import { FileDescriptorProto, FileDescriptorProtoSchema } from "@bufbuild/protobuf/wkt"; -import { BufferWrapper, MAX_VARINT_LEN_64 } from "./buffer-wrapper"; import { LRUCache } from "lru-cache"; import {field_meta, file_confluent_meta, Meta} from "../confluent/meta_pb"; import {RuleRegistry} from "./rule-registry"; @@ -57,6 +56,9 @@ import {file_google_type_postal_address} from "../google/type/postal_address_pb" import {file_google_type_quaternion} from "../google/type/quaternion_pb"; import {file_google_type_timeofday} from "../google/type/timeofday_pb"; import {file_google_type_month} from "../google/type/month_pb"; +import {IHeaders} from "../../types/kafkajs"; + +export const PROTOBUF_TYPE = "PROTOBUF" const builtinDeps = new Map([ ['confluent/meta.proto', file_confluent_meta], @@ -132,8 +134,9 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde { * Serializes a message. * @param topic - the topic * @param msg - the message + * @param headers - optional headers */ - override async serialize(topic: string, msg: any): Promise { + override async serialize(topic: string, msg: any, headers?: IHeaders): Promise { if (this.client == null) { throw new Error('client is not initialized') } @@ -158,12 +161,12 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde { const fileDesc = messageDesc.file schema = await this.getSchemaInfo(fileDesc) } - const [id, info] = await this.getId(topic, msg, schema, 'serialized') + const [schemaId, info] = await this.getSchemaId(PROTOBUF_TYPE, topic, msg, schema, 'serialized') const subject = this.subjectName(topic, info) msg = await this.executeRules(subject, topic, RuleMode.WRITE, null, info, msg, null) - const msgIndexBytes = this.toMessageIndexBytes(messageDesc) + schemaId.messageIndexes = this.toMessageIndexArray(messageDesc) const msgBytes = Buffer.from(toBinary(messageDesc, msg)) - return this.writeBytes(id, Buffer.concat([msgIndexBytes, msgBytes])) + return this.serializeSchemaId(topic, msgBytes, schemaId, headers) } async getSchemaInfo(fileDesc: DescFile): Promise { @@ -235,6 +238,8 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde { } return { id: id, + // TODO verify that guid is not required + guid: "", subject: subject, version: version, schema: info.schema, @@ -245,15 +250,8 @@ export class ProtobufSerializer extends Serializer implements ProtobufSerde { } } - toMessageIndexBytes(messageDesc: DescMessage): Buffer { - const msgIndexes: number[] = this.toMessageIndexes(messageDesc, 0) - const buffer = Buffer.alloc((1 + msgIndexes.length) * MAX_VARINT_LEN_64) - const bw = new BufferWrapper(buffer) - bw.writeVarInt(msgIndexes.length) - for (let i = 0; i < msgIndexes.length; i++) { - bw.writeVarInt(msgIndexes[i]) - } - return buffer.subarray(0, bw.pos) + toMessageIndexArray(messageDesc: DescMessage): number[] { + return this.toMessageIndexes(messageDesc, 0) } toMessageIndexes(messageDesc: DescMessage, count: number): number[] { @@ -366,8 +364,9 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde * Deserializes a message. * @param topic - the topic * @param payload - the message payload + * @param headers - optional headers */ - override async deserialize(topic: string, payload: Buffer): Promise { + override async deserialize(topic: string, payload: Buffer, headers?: IHeaders): Promise { if (!Buffer.isBuffer(payload)) { throw new Error('Invalid buffer') } @@ -375,15 +374,16 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde return null } - const info = await this.getSchema(topic, payload, 'serialized') + const schemaId = new SchemaId(PROTOBUF_TYPE) + const [info, bytesRead] = await this.getWriterSchema(topic, payload, schemaId, headers, 'serialized') + payload = payload.subarray(bytesRead) const fd = await this.toFileDesc(this.client, info) - const [bytesRead, msgIndexes] = this.readMessageIndexes(payload.subarray(5)) - const messageDesc = this.toMessageDescFromIndexes(fd, msgIndexes) + const messageDesc = this.toMessageDescFromIndexes(fd, schemaId.messageIndexes!) const subject = this.subjectName(topic, info) const readerMeta = await this.getReaderSchema(subject, 'serialized') - const msgBytes = payload.subarray(5 + bytesRead) + const msgBytes = payload let msg = fromBinary(messageDesc, msgBytes) // Currently JavaScript does not support migration rules @@ -436,16 +436,6 @@ export class ProtobufDeserializer extends Deserializer implements ProtobufSerde throw new SerializationError('message descriptor not found') } - readMessageIndexes(payload: Buffer): [number, number[]] { - const bw = new BufferWrapper(payload) - const count = bw.readVarInt() - const msgIndexes = [] - for (let i = 0; i < count; i++) { - msgIndexes.push(bw.readVarInt()) - } - return [bw.pos, msgIndexes] - } - toMessageDescFromIndexes(fd: DescFile, msgIndexes: number[]): DescMessage { let index = msgIndexes[0] if (msgIndexes.length === 1) { diff --git a/schemaregistry/serde/serde.ts b/schemaregistry/serde/serde.ts index a33ac2da..19fefe65 100644 --- a/schemaregistry/serde/serde.ts +++ b/schemaregistry/serde/serde.ts @@ -9,6 +9,8 @@ import { } from "../schemaregistry-client"; import {RuleRegistry} from "./rule-registry"; import {ClientConfig} from "../rest-service"; +import {BufferWrapper, MAX_VARINT_LEN_64} from "./buffer-wrapper"; +import {IHeaders} from "../../types/kafkajs"; export enum SerdeType { KEY = 'KEY', @@ -16,6 +18,139 @@ export enum SerdeType { } export const MAGIC_BYTE = Buffer.alloc(1) +export const MAGIC_BYTE_V0 = MAGIC_BYTE +export const MAGIC_BYTE_V1 = Buffer.alloc(1, 1) + +export const KEY_SCHEMA_ID_HEADER = '__key_schema_id' +export const VALUE_SCHEMA_ID_HEADER = '__value_schema_id' + +const byteToHex: string[] = []; +for (let i = 0; i < 256; ++i) { + byteToHex.push((i + 0x100).toString(16).slice(1)); +} + +export class SchemaId { + schemaType: string + id?: number + guid?: string + messageIndexes?: number[] + + constructor(schemaType: string, id?: number, guid?: string, messageIndexes?: number[]) { + this.schemaType = schemaType + this.id = id + this.guid = guid + this.messageIndexes = messageIndexes + } + + fromBytes(payload: Buffer): number { + let totalBytesRead = 0 + const magicByte = payload.subarray(0, 1) + if (magicByte.equals(MAGIC_BYTE_V0)) { + this.id = payload.subarray(1, 5).readInt32BE(0) + totalBytesRead = 5 + } else if (magicByte.equals(MAGIC_BYTE_V1)) { + this.guid = this.stringifyUuid(payload.subarray(1, 17)) + totalBytesRead = 17 + } else { + throw new SerializationError( + `Unknown magic byte ${JSON.stringify(magicByte)}` + ) + } + if (this.schemaType == "PROTOBUF") { + const [bytesRead, msgIndexes] = this.readMessageIndexes(payload.subarray(totalBytesRead)) + this.messageIndexes = msgIndexes + totalBytesRead += bytesRead + } + return totalBytesRead + } + + stringifyUuid(arr: Uint8Array, offset = 0) { + return (byteToHex[arr[offset + 0]] + + byteToHex[arr[offset + 1]] + + byteToHex[arr[offset + 2]] + + byteToHex[arr[offset + 3]] + + '-' + + byteToHex[arr[offset + 4]] + + byteToHex[arr[offset + 5]] + + '-' + + byteToHex[arr[offset + 6]] + + byteToHex[arr[offset + 7]] + + '-' + + byteToHex[arr[offset + 8]] + + byteToHex[arr[offset + 9]] + + '-' + + byteToHex[arr[offset + 10]] + + byteToHex[arr[offset + 11]] + + byteToHex[arr[offset + 12]] + + byteToHex[arr[offset + 13]] + + byteToHex[arr[offset + 14]] + + byteToHex[arr[offset + 15]]).toLowerCase(); + } + + idToBytes() : Buffer { + if (this.id == null) { + throw new SerializationError('Schema id is not set') + } + const idBuffer = Buffer.alloc(4) + idBuffer.writeInt32BE(this.id!, 0) + if (this.messageIndexes != null) { + return Buffer.concat([MAGIC_BYTE_V0, idBuffer, this.writeMessageIndexes(this.messageIndexes)]) + } + return Buffer.concat([MAGIC_BYTE_V0, idBuffer]) + } + + guidToBytes() : Buffer { + if (this.guid == null) { + throw new SerializationError('Schema guid is not set') + } + const guidBuffer = Buffer.from(this.parseUuid(this.guid!)) + if (this.messageIndexes != null) { + return Buffer.concat([MAGIC_BYTE_V1, guidBuffer, this.writeMessageIndexes(this.messageIndexes)]) + } + return Buffer.concat([MAGIC_BYTE_V1, guidBuffer]) + } + + parseUuid(uuid: string): Uint8Array { + let v; + return Uint8Array.of( + (v = parseInt(uuid.slice(0, 8), 16)) >>> 24, + (v >>> 16) & 0xff, + (v >>> 8) & 0xff, v & 0xff, + (v = parseInt(uuid.slice(9, 13), 16)) >>> 8, v & 0xff, + (v = parseInt(uuid.slice(14, 18), 16)) >>> 8, v & 0xff, + (v = parseInt(uuid.slice(19, 23), 16)) >>> 8, v & 0xff, + ((v = parseInt(uuid.slice(24, 36), 16)) / 0x10000000000) & 0xff, + (v / 0x100000000) & 0xff, (v >>> 24) & 0xff, (v >>> 16) & 0xff, (v >>> 8) & 0xff, v & 0xff); + } + + readMessageIndexes(payload: Buffer): [number, number[]] { + const bw = new BufferWrapper(payload) + const count = bw.readVarInt() + if (count == 0) { + return [1, [0]] + } + const msgIndexes = [] + for (let i = 0; i < count; i++) { + msgIndexes.push(bw.readVarInt()) + } + return [bw.pos, msgIndexes] + } + + writeMessageIndexes(msgIndexes: number[]): Buffer { + if (msgIndexes.length === 1 && msgIndexes[0] === 0) { + const buffer = Buffer.alloc(1) + buffer.writeUInt8(0, 0) + return buffer + } + const buffer = Buffer.alloc((1 + msgIndexes.length) * MAX_VARINT_LEN_64) + const bw = new BufferWrapper(buffer) + bw.writeVarInt(msgIndexes.length) + for (let i = 0; i < msgIndexes.length; i++) { + bw.writeVarInt(msgIndexes[i]) + } + return buffer.subarray(0, bw.pos) + } +} /** * SerializationError represents a serialization error @@ -247,6 +382,8 @@ export interface SerializerConfig extends SerdeConfig { useSchemaId?: number // normalizeSchemas determines whether to normalize schemas normalizeSchemas?: boolean + // schemaIdSerializer determines how to serialize schema IDs + schemaIdSerializer?: SchemaIdSerializerFunc } /** @@ -265,49 +402,52 @@ export abstract class Serializer extends Serde { * Serialize serializes a message * @param topic - the topic * @param msg - the message + * @param headers - optional headers */ - abstract serialize(topic: string, msg: any): Promise + abstract serialize(topic: string, msg: any, headers?: IHeaders): Promise - // GetID returns a schema ID for the given schema - async getId(topic: string, msg: any, info?: SchemaInfo, format?: string): Promise<[number, SchemaInfo]> { + // GetSchemaID returns a schema ID for the given schema + async getSchemaId(schemaType: string, topic: string, msg: any, info?: SchemaInfo, format?: string): Promise<[SchemaId, SchemaInfo]> { let autoRegister = this.config().autoRegisterSchemas let useSchemaId = this.config().useSchemaId let useLatestWithMetadata = this.config().useLatestWithMetadata let useLatest = this.config().useLatestVersion let normalizeSchema = this.config().normalizeSchemas - let id = -1 + let metadata: SchemaMetadata let subject = this.subjectName(topic, info) if (autoRegister) { - id = await this.client.register(subject, info!, Boolean(normalizeSchema)) + metadata = await this.client.registerFullResponse(subject, info!, Boolean(normalizeSchema)) } else if (useSchemaId != null && useSchemaId >= 0) { info = await this.client.getBySubjectAndId(subject, useSchemaId, format) - id = useSchemaId + metadata = await this.client.getIdFullResponse(subject, info!, Boolean(normalizeSchema)) } else if (useLatestWithMetadata != null && Object.keys(useLatestWithMetadata).length !== 0) { - let metadata = await this.client.getLatestWithMetadata(subject, useLatestWithMetadata, true, format) + metadata = await this.client.getLatestWithMetadata(subject, useLatestWithMetadata, true, format) info = metadata - id = metadata.id } else if (useLatest) { - let metadata = await this.client.getLatestSchemaMetadata(subject, format) + metadata = await this.client.getLatestSchemaMetadata(subject, format) info = metadata - id = metadata.id } else { - id = await this.client.getId(subject, info!, Boolean(normalizeSchema)) + metadata = await this.client.getIdFullResponse(subject, info!, Boolean(normalizeSchema)) } - return [id, info!] + let schemaId = new SchemaId(schemaType, metadata.id, metadata.guid) + return [schemaId, info!] } - writeBytes(id: number, msgBytes: Buffer): Buffer { - const idBuffer = Buffer.alloc(4) - idBuffer.writeInt32BE(id, 0) - return Buffer.concat([MAGIC_BYTE, idBuffer, msgBytes]) + serializeSchemaId(topic: string, payload: Buffer, schemaId: SchemaId, headers?: IHeaders): Buffer { + const serializer = this.config().schemaIdSerializer ?? PrefixSchemaIdSerializer + return serializer(topic, this.serdeType, payload, schemaId, headers) } } /** * DeserializerConfig represents a deserializer configuration */ -export type DeserializerConfig = SerdeConfig +export interface DeserializerConfig extends SerdeConfig { + // schemaIdDeserializer determines how to deserialize schema IDs + schemaIdDeserializer?: SchemaIdDeserializerFunc +} + /** * Migration represents a migration @@ -334,21 +474,27 @@ export abstract class Deserializer extends Serde { * Deserialize deserializes a message * @param topic - the topic * @param payload - the payload + * @param headers - optional headers */ - abstract deserialize(topic: string, payload: Buffer): Promise + abstract deserialize(topic: string, payload: Buffer, headers?: IHeaders): Promise - async getSchema(topic: string, payload: Buffer, format?: string): Promise { - const magicByte = payload.subarray(0, 1) - if (!magicByte.equals(MAGIC_BYTE)) { - throw new SerializationError( - `Message encoded with magic byte ${JSON.stringify(magicByte)}, expected ${JSON.stringify( - MAGIC_BYTE, - )}`, - ) + deserializeSchemaId(topic: string, payload: Buffer, schemaId: SchemaId, headers?: IHeaders): number { + const deserializer = this.config().schemaIdDeserializer ?? DualSchemaIdDeserializer + return deserializer(topic, this.serdeType, payload, schemaId, headers) + } + + async getWriterSchema(topic: string, payload: Buffer, schemaId: SchemaId, headers?: IHeaders, format?: string): Promise<[SchemaInfo, number]> { + const bytesRead = this.deserializeSchemaId(topic, payload, schemaId, headers) + let info: SchemaInfo + if (schemaId.id != null) { + let subject = this.subjectName(topic) + info = await this.client.getBySubjectAndId(subject, schemaId.id!, format) + } else if (schemaId.guid != null) { + info = await this.client.getByGuid(schemaId.guid!, format) + } else { + throw new SerializationError("Invalid schema ID") } - const id = payload.subarray(1, 5).readInt32BE(0) - let subject = this.subjectName(topic) - return await this.client.getBySubjectAndId(subject, id, format) + return [info, bytesRead] } async getReaderSchema(subject: string, format?: string): Promise { @@ -400,6 +546,7 @@ export abstract class Deserializer extends Serde { let version = await this.client.getVersion(subject, sourceInfo, false, true) let source: SchemaMetadata = { id: 0, + guid: "", version: version, schema: sourceInfo.schema, references: sourceInfo.references, @@ -501,6 +648,93 @@ export const TopicNameStrategy: SubjectNameStrategyFunc = (topic: string, serdeT return topic + suffix } +/** + * SchemaIdSerializerFunc serializes a schema ID/GUID + */ +export type SchemaIdSerializerFunc = ( + topic: string, + serdeType: SerdeType, + payload: Buffer, + schemaId: SchemaId, + headers?: IHeaders, +) => Buffer + +export const HeaderSchemaIdSerializer: SchemaIdSerializerFunc = ( + topic: string, + serdeType: SerdeType, + payload: Buffer, + schemaId: SchemaId, + headers?: IHeaders) => { + if (headers == null) { + throw new SerializationError('Missing Headers') + } + let headerKey = serdeType === SerdeType.KEY ? KEY_SCHEMA_ID_HEADER : VALUE_SCHEMA_ID_HEADER + headers![headerKey] = schemaId.guidToBytes() + return payload +} + +export const PrefixSchemaIdSerializer: SchemaIdSerializerFunc = ( + topic: string, + serdeType: SerdeType, + payload: Buffer, + schemaId: SchemaId, + headers?: IHeaders) => { + return Buffer.concat([schemaId.idToBytes(), payload]) +} + +/** + * SchemaIdDeserializerFunc serializes a schema ID/GUID + */ +export type SchemaIdDeserializerFunc = ( + topic: string, + serdeType: SerdeType, + payload: Buffer, + schemaId: SchemaId, + headers?: IHeaders, +) => number + +export const DualSchemaIdDeserializer: SchemaIdDeserializerFunc = ( + topic: string, + serdeType: SerdeType, + payload: Buffer, + schemaId: SchemaId, + headers?: IHeaders) => { + let headerKey = serdeType === SerdeType.KEY ? KEY_SCHEMA_ID_HEADER : VALUE_SCHEMA_ID_HEADER + // get header with headerKey from headers + if (headers != null) { + let headerValues = headers![headerKey] + let buf: Buffer | null + if (headerValues != null) { + if (Array.isArray(headerValues)) { + let headerValue = headerValues.length > 0 ? headerValues[headerValues.length - 1] : null + if (typeof headerValue === 'string') { + buf = Buffer.from(headerValue, 'utf8'); + } else { + buf = headerValue + } + } else if (typeof headerValues === 'string') { + buf = Buffer.from(headerValues, 'utf8'); + } else { + buf = headerValues + } + if (buf != null) { + schemaId.fromBytes(buf) + return 0 + } + } + } + return schemaId.fromBytes(payload) +} + +export const PrefixSchemaIdDeserializer: SchemaIdDeserializerFunc = ( + topic: string, + serdeType: SerdeType, + payload: Buffer, + schemaId: SchemaId, + headers?: IHeaders) => { + return schemaId.fromBytes(payload) +} + /** * RuleContext represents a rule context */ diff --git a/schemaregistry/test/schemaregistry-client.spec.ts b/schemaregistry/test/schemaregistry-client.spec.ts index 49016172..ef314ee0 100644 --- a/schemaregistry/test/schemaregistry-client.spec.ts +++ b/schemaregistry/test/schemaregistry-client.spec.ts @@ -129,6 +129,7 @@ describe('SchemaRegistryClient-Register', () => { it('Should return id, version, metadata, and schema when RegisterFullResponse is called', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -145,6 +146,7 @@ describe('SchemaRegistryClient-Register', () => { it('Should return id, version, metadata, and schema from cache when RegisterFullResponse is called twice', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -198,13 +200,13 @@ describe('SchemaRegistryClient-Get-ID', () => { }); it('Should return id from cache when GetId is called twice', async () => { - restService.handleRequest.mockResolvedValue({ data: { id: 1 } } as AxiosResponse); + restService.handleRequest.mockResolvedValue({ data: { id: 1, version: 1 } } as AxiosResponse); const response: number = await client.getId(mockSubject, schemaInfo); expect(response).toEqual(1); expect(restService.handleRequest).toHaveBeenCalledTimes(1); - restService.handleRequest.mockResolvedValue({ data: { id: 2 } } as AxiosResponse); + restService.handleRequest.mockResolvedValue({ data: { id: 2, version: 1 } } as AxiosResponse); const response2: number = await client.getId(mockSubject2, schemaInfo2); expect(response2).toEqual(2); @@ -222,6 +224,7 @@ describe('SchemaRegistryClient-Get-ID', () => { it('Should return SchemaInfo when GetBySubjectAndId is called', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -238,6 +241,7 @@ describe('SchemaRegistryClient-Get-ID', () => { it('Should return SchemaInfo from cache when GetBySubjectAndId is called twice', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -284,6 +288,7 @@ describe('SchemaRegistryClient-Get-Schema-Metadata', () => { it('Should return latest schema with metadata when GetLatestWithMetadata is called', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -300,6 +305,7 @@ describe('SchemaRegistryClient-Get-Schema-Metadata', () => { it('Should return latest schema with metadata from cache when GetLatestWithMetadata is called twice', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -335,6 +341,7 @@ describe('SchemaRegistryClient-Get-Schema-Metadata', () => { it('Should return SchemaMetadata when GetSchemaMetadata is called', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -351,6 +358,7 @@ describe('SchemaRegistryClient-Get-Schema-Metadata', () => { it('Should return SchemaMetadata from cache when GetSchemaMetadata is called twice', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -459,6 +467,7 @@ describe('SchemaRegistryClient-Subjects', () => { it('Should delete subject from all caches and registry when deleteSubject is called', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -484,6 +493,7 @@ describe('SchemaRegistryClient-Subjects', () => { it('Should delete subject version from all caches and registry when deleteSubjectVersion is called', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, @@ -648,6 +658,7 @@ describe('SchemaRegistryClient-Cache', () => { it('Should delete cached item after expiry', async () => { const expectedResponse = { id: 1, + guid: "", version: 1, schema: schemaString, metadata: metadata, diff --git a/schemaregistry/test/serde/avro.spec.ts b/schemaregistry/test/serde/avro.spec.ts index 1d1d0591..dd26eb7b 100644 --- a/schemaregistry/test/serde/avro.spec.ts +++ b/schemaregistry/test/serde/avro.spec.ts @@ -6,7 +6,7 @@ import { AvroSerializer, AvroSerializerConfig } from "../../serde/avro"; -import {SerdeType, Serializer} from "../../serde/serde"; +import {HeaderSchemaIdSerializer, SerdeType, SerializationError, Serializer} from "../../serde/serde"; import { Client, Rule, @@ -30,6 +30,8 @@ import {RuleRegistry} from "@confluentinc/schemaregistry/serde/rule-registry"; import { clearKmsClients } from "@confluentinc/schemaregistry/rules/encryption/kms-registry"; +import {CelExecutor} from "../../rules/cel/cel-executor"; +import {CelFieldExecutor} from "../../rules/cel/cel-field-executor"; const rootSchema = ` { @@ -317,6 +319,8 @@ class FakeClock extends Clock { } const fieldEncryptionExecutor = FieldEncryptionExecutor.registerWithClock(new FakeClock()) +CelExecutor.register() +CelFieldExecutor.register() JsonataExecutor.register() AwsKmsDriver.register() AzureKmsDriver.register() @@ -364,6 +368,32 @@ describe('AvroSerializer', () => { expect(obj2.boolField).toEqual(obj.boolField); expect(obj2.bytesField).toEqual(obj.bytesField); }) + it('guid in header', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new AvroSerializer(client, SerdeType.VALUE, + {autoRegisterSchemas: true, schemaIdSerializer: HeaderSchemaIdSerializer}) + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let headers = {} + let bytes = await ser.serialize(topic, obj, headers) + + let deser = new AvroDeserializer(client, SerdeType.VALUE, {}) + let obj2 = await deser.deserialize(topic, bytes, headers) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) it('serialize nested', async () => { let conf: ClientConfig = { baseURLs: [baseURL], @@ -630,6 +660,393 @@ describe('AvroSerializer', () => { expect(obj2.fieldToDelete).toEqual(undefined); expect(obj2.newOptionalField).toEqual("optional"); }) + it('cel condition', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'CONDITION', + mode: RuleMode.WRITE, + type: 'CEL', + expr: "message.stringField == 'hi'" + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) + it('cel condition fail', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'CONDITION', + mode: RuleMode.WRITE, + type: 'CEL', + expr: "message.stringField != 'hi'" + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + try { + await ser.serialize(topic, obj) + expect(true).toBe(false) + } catch (err) { + expect(err).toBeInstanceOf(SerializationError) + } + }) + it('cel condition ignore fail', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'CONDITION', + mode: RuleMode.WRITE, + type: 'CEL', + expr: "message.stringField != 'hi'", + onFailure: 'NONE' + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) + it('cel field transform', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'TRANSFORM', + mode: RuleMode.WRITE, + type: 'CEL_FIELD', + expr: "name == 'stringField' ; value + '-suffix'" + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual('hi-suffix'); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) + it('cel field complex transform', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'TRANSFORM', + mode: RuleMode.WRITE, + type: 'CEL_FIELD', + expr: "typeName == 'STRING' ; value + '-suffix'", + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: complexSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + arrayField: [ 'hello' ], + mapField: { 'key': 'world' }, + unionField: 'bye', + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.arrayField).toEqual([ 'hello-suffix' ]); + expect(obj2.mapField).toEqual({ 'key': 'world-suffix' }); + expect(obj2.unionField).toEqual('bye-suffix'); + }) + it('cel field complex transform with null', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'TRANSFORM', + mode: RuleMode.WRITE, + type: 'CEL_FIELD', + expr: "typeName == 'STRING' ; value + '-suffix'", + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: complexSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + arrayField: [ 'hello' ], + mapField: { 'key': 'world' }, + unionField: null, + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.arrayField).toEqual([ 'hello-suffix' ]); + expect(obj2.mapField).toEqual({ 'key': 'world-suffix' }); + expect(obj2.unionField).toEqual(null); + }) + it('cel field condition', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'CONDITION', + mode: RuleMode.WRITE, + type: 'CEL_FIELD', + expr: "name == 'stringField' ; value == 'hi'" + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + let bytes = await ser.serialize(topic, obj) + + let deserConfig: AvroDeserializerConfig = { + ruleConfig: { + secret: 'mysecret' + } + } + let deser = new AvroDeserializer(client, SerdeType.VALUE, deserConfig) + let obj2 = await deser.deserialize(topic, bytes) + expect(obj2.intField).toEqual(obj.intField); + expect(obj2.doubleField).toBeCloseTo(obj.doubleField, 0.001); + expect(obj2.stringField).toEqual(obj.stringField); + expect(obj2.boolField).toEqual(obj.boolField); + expect(obj2.bytesField).toEqual(obj.bytesField); + }) + it('cel field condition fail', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let serConfig: AvroSerializerConfig = { + useLatestVersion: true, + } + let ser = new AvroSerializer(client, SerdeType.VALUE, serConfig) + + let encRule: Rule = { + name: 'test-cel', + kind: 'CONDITION', + mode: RuleMode.WRITE, + type: 'CEL_FIELD', + expr: "name == 'stringField' ; value != 'hi'" + } + let ruleSet: RuleSet = { + domainRules: [encRule] + } + + let info: SchemaInfo = { + schemaType: 'AVRO', + schema: demoSchema, + ruleSet + } + + await client.register(subject, info, false) + + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([1, 2]), + } + try { + await ser.serialize(topic, obj) + expect(true).toBe(false) + } catch (err) { + expect(err).toBeInstanceOf(SerializationError) + } + }) it('basic encryption', async () => { let conf: ClientConfig = { baseURLs: [baseURL], diff --git a/schemaregistry/test/serde/json.spec.ts b/schemaregistry/test/serde/json.spec.ts index f50f5ece..45624944 100644 --- a/schemaregistry/test/serde/json.spec.ts +++ b/schemaregistry/test/serde/json.spec.ts @@ -1,6 +1,11 @@ import {afterEach, describe, expect, it} from '@jest/globals'; import {ClientConfig} from "../../rest-service"; -import {SerdeType, SerializationError, Serializer} from "../../serde/serde"; +import { + HeaderSchemaIdSerializer, + SerdeType, + SerializationError, + Serializer +} from "../../serde/serde"; import { Client, Rule, @@ -242,6 +247,31 @@ describe('JsonSerializer', () => { let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) }) + it('guid in header', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new JsonSerializer(client, SerdeType.VALUE, { + autoRegisterSchemas: true, + validate: true, + schemaIdSerializer: HeaderSchemaIdSerializer + }) + let obj = { + intField: 123, + doubleField: 45.67, + stringField: 'hi', + boolField: true, + bytesField: Buffer.from([0, 0, 0, 1]).toString('base64') + } + let headers = {} + let bytes = await ser.serialize(topic, obj, headers) + + let deser = new JsonDeserializer(client, SerdeType.VALUE, {}) + let obj2 = await deser.deserialize(topic, bytes, headers) + expect(obj2).toEqual(obj) + }) it('serialize nested', async () => { let conf: ClientConfig = { baseURLs: [baseURL], diff --git a/schemaregistry/test/serde/protobuf.spec.ts b/schemaregistry/test/serde/protobuf.spec.ts index 5604024d..74806ee0 100644 --- a/schemaregistry/test/serde/protobuf.spec.ts +++ b/schemaregistry/test/serde/protobuf.spec.ts @@ -4,7 +4,7 @@ import { ProtobufDeserializer, ProtobufDeserializerConfig, ProtobufSerializer, ProtobufSerializerConfig, } from "../../serde/protobuf"; -import {SerdeType} from "../../serde/serde"; +import {HeaderSchemaIdSerializer, SerdeType} from "../../serde/serde"; import { Rule, RuleMode, @@ -65,6 +65,28 @@ describe('ProtobufSerializer', () => { let obj2 = await deser.deserialize(topic, bytes) expect(obj2).toEqual(obj) }) + it('guid in header', async () => { + let conf: ClientConfig = { + baseURLs: [baseURL], + cacheCapacity: 1000 + } + let client = SchemaRegistryClient.newClient(conf) + let ser = new ProtobufSerializer(client, SerdeType.VALUE, + {autoRegisterSchemas: true, schemaIdSerializer: HeaderSchemaIdSerializer}) + ser.registry.add(AuthorSchema) + let obj = create(AuthorSchema, { + name: 'Kafka', + id: 123, + picture: Buffer.from([1, 2]), + works: ['The Castle', 'The Trial'] + }) + let headers = {} + let bytes = await ser.serialize(topic, obj, headers) + + let deser = new ProtobufDeserializer(client, SerdeType.VALUE, {}) + let obj2 = await deser.deserialize(topic, bytes, headers) + expect(obj2).toEqual(obj) + }) it('serialize second messsage', async () => { let conf: ClientConfig = { baseURLs: [baseURL], diff --git a/schemaregistry/test/serde/serde.spec.ts b/schemaregistry/test/serde/serde.spec.ts new file mode 100644 index 00000000..bd97b141 --- /dev/null +++ b/schemaregistry/test/serde/serde.spec.ts @@ -0,0 +1,69 @@ +import { describe, expect, it } from '@jest/globals'; +import {SchemaId} from "../../serde/serde"; + +describe('SchemaGuid', () => { + it('schema guid', () => { + const schemaId = new SchemaId("AVRO") + const input = new Uint8Array([ + 0x01, 0x89, 0x79, 0x17, 0x62, 0x23, 0x36, 0x41, 0x86, 0x96, 0x74, 0x29, 0x9b, 0x90, + 0xa8, 0x02, 0xe2, + ]) + schemaId.fromBytes(Buffer.from(input)) + const guid = schemaId.guid + expect(guid).toEqual("89791762-2336-4186-9674-299b90a802e2") + + const output = new Uint8Array(schemaId.guidToBytes()) + for (let i = 0; i < output.length; i++) { + expect(output[i]).toEqual(input[i]) + } + }) + it('schema id', () => { + const schemaId = new SchemaId("AVRO") + const input = new Uint8Array([ + 0x00, 0x00, 0x00, 0x00, 0x01, + ]) + schemaId.fromBytes(Buffer.from(input)) + const id = schemaId.id + expect(id).toEqual(1) + + const output = new Uint8Array(schemaId.idToBytes()) + for (let i = 0; i < output.length; i++) { + expect(output[i]).toEqual(input[i]) + } + }) + it('schema guid with message indexes', () => { + const schemaId = new SchemaId("PROTOBUF") + const input = new Uint8Array([ + 0x01, 0x89, 0x79, 0x17, 0x62, 0x23, 0x36, 0x41, 0x86, 0x96, 0x74, 0x29, 0x9b, 0x90, + 0xa8, 0x02, 0xe2, 0x06, 0x02, 0x04, 0x06, + ]) + schemaId.fromBytes(Buffer.from(input)) + const guid = schemaId.guid + expect(guid).toEqual("89791762-2336-4186-9674-299b90a802e2") + + const msgIndexes = schemaId.messageIndexes + expect(msgIndexes).toEqual([1, 2, 3]) + + const output = new Uint8Array(schemaId.guidToBytes()) + for (let i = 0; i < output.length; i++) { + expect(output[i]).toEqual(input[i]) + } + }) + it('schema id with message indexes', () => { + const schemaId = new SchemaId("PROTOBUF") + const input = new Uint8Array([ + 0x00, 0x00, 0x00, 0x00, 0x01, 0x06, 0x02, 0x04, 0x06, + ]) + schemaId.fromBytes(Buffer.from(input)) + const id = schemaId.id + expect(id).toEqual(1) + + const msgIndexes = schemaId.messageIndexes + expect(msgIndexes).toEqual([1, 2, 3]) + + const output = new Uint8Array(schemaId.idToBytes()) + for (let i = 0; i < output.length; i++) { + expect(output[i]).toEqual(input[i]) + } + }) +}) diff --git a/schemaregistry/tsconfig.json b/schemaregistry/tsconfig.json index a409cdc7..28f8466b 100644 --- a/schemaregistry/tsconfig.json +++ b/schemaregistry/tsconfig.json @@ -25,6 +25,8 @@ "skipLibCheck": true }, "include": [ + "*.ts", + "**/*.ts", "test/**/*", "e2e/**/*" ], diff --git a/service.yml b/service.yml index fb34272e..a00418b3 100644 --- a/service.yml +++ b/service.yml @@ -10,6 +10,7 @@ codeowners: enable: true semaphore: enable: true + pipeline_enable: false tasks: - name: post-install scheduled: false diff --git a/test/promisified/admin/list_topics.spec.js b/test/promisified/admin/list_topics.spec.js index 77a3447b..25fa6c36 100644 --- a/test/promisified/admin/list_topics.spec.js +++ b/test/promisified/admin/list_topics.spec.js @@ -26,10 +26,20 @@ describe('Admin > listTopics', () => { it('should timeout', async () => { await admin.connect(); - await expect(admin.listTopics({ timeout: 1 })).rejects.toHaveProperty( - 'code', - ErrorCodes.ERR__TIMED_OUT - ); + while (true) { + try { + await admin.listTopics({ timeout: 0.00001 }); + jest.fail('Should have thrown an error'); + } catch (e) { + if (e.code === ErrorCodes.ERR__TRANSPORT) + continue; + expect(e).toHaveProperty( + 'code', + ErrorCodes.ERR__TIMED_OUT + ); + break; + } + } }); it('should list consumer topics', async () => { diff --git a/test/promisified/consumer/consumeMessages.spec.js b/test/promisified/consumer/consumeMessages.spec.js index 93a3e694..c34a04db 100644 --- a/test/promisified/consumer/consumeMessages.spec.js +++ b/test/promisified/consumer/consumeMessages.spec.js @@ -412,19 +412,6 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit partitions: partitions, }); - /* Reconfigure producer and consumer in such a way that batches are likely - * to be small and we get multiple partitions in the cache at once. - * This is to reduce flakiness. */ - producer = createProducer({}, { - 'batch.num.messages': 1, - }); - - consumer = createConsumer({ - 'groupId': groupId, - }, { - 'fetch.message.max.bytes': 1, - }); - await producer.connect(); await consumer.connect(); await consumer.subscribe({ topic: topicName }); @@ -451,7 +438,7 @@ describe.each(cases)('Consumer - partitionsConsumedConcurrently = %s -', (partit await waitFor(() => consumer.assignment().length > 0, () => { }, 100); - const messages = Array(1024*9) + const messages = Array(4096 * 3) .fill() .map((_, i) => { const value = secureRandom(512); diff --git a/test/promisified/consumer/rebalanceCallback.spec.js b/test/promisified/consumer/rebalanceCallback.spec.js index 9269e19b..2659f538 100644 --- a/test/promisified/consumer/rebalanceCallback.spec.js +++ b/test/promisified/consumer/rebalanceCallback.spec.js @@ -18,6 +18,7 @@ describe('Consumer', () => { groupId = `consumer-group-id-${secureRandom()}`; consumerConfig = { groupId, + fromBeginning: true, }; consumer = null; await createTopic({ topic: topicName, partitions: 3 }); diff --git a/test/promisified/consumer/seek.spec.js b/test/promisified/consumer/seek.spec.js index 17e1c749..93de9596 100644 --- a/test/promisified/consumer/seek.spec.js +++ b/test/promisified/consumer/seek.spec.js @@ -179,6 +179,9 @@ describe('Consumer seek >', () => { topic: topicName, messages: [message1, message2, message3, message4], }); + // Avoids a validation that resets the offset + // with subsequent seek + await producer.flush(); await consumer.subscribe({ topic: topicName }); @@ -305,6 +308,10 @@ describe('Consumer seek >', () => { const message3 = { key: `key-0`, value: `value-${value3}`, partition: 0 }; await producer.send({ topic: topicName, messages: [message1, message2, message3] }); + // Avoids a validation that resets the offset + // with subsequent seek + await producer.flush(); + await consumer.subscribe({ topic: topicName, }); const messagesConsumed = []; @@ -348,6 +355,11 @@ describe('Consumer seek >', () => { }); describe('batch staleness >', () => { + beforeEach(async () => { + // These tests expect a single partititon + await createTopic({ topic: topicName, partitions: 1 }); + }); + it('stops consuming messages after staleness', async () => { consumer = createConsumer({ groupId, @@ -410,6 +422,13 @@ describe('Consumer seek >', () => { consumer.run({ eachBatch: async ({ batch, isStale, resolveOffset }) => { + if (offsetsConsumed.length === 0 && + batch.messages.length === 1) { + // Await a batch of at least two messages + resolveOffset(batch.messages[0].offset); + return; + } + for (const message of batch.messages) { if (isStale()) break; diff --git a/test/promisified/producer/flush.spec.js b/test/promisified/producer/flush.spec.js index c4f7daf9..64a15b9f 100644 --- a/test/promisified/producer/flush.spec.js +++ b/test/promisified/producer/flush.spec.js @@ -75,7 +75,7 @@ describe('Producer > Flush', () => { let messageSent = false; /* Larger number of messages */ - producer.send({ topic: topicName, messages: Array(100).fill(message) }).then(() => { + producer.send({ topic: topicName, messages: Array(1000).fill(message) }).then(() => { messageSent = true; });