diff --git a/build/common.props b/build/common.props index 0cae42a5..58dd0758 100644 --- a/build/common.props +++ b/build/common.props @@ -1,7 +1,7 @@ - 4.1.2$(VersionSuffix) + 4.1.3$(VersionSuffix) netstandard2.0 Microsoft Microsoft diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs index 92dccd58..825f4381 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Output/KafkaProducerFactory.cs @@ -88,9 +88,17 @@ private IKafkaProducer Create(Handle producerBaseHandle, KafkaProducerEntity ent var schemaRegistryUrl = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SchemaRegistryUrl); var schemaRegistryUsername = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SchemaRegistryUsername); var schemaRegistryPassword = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.SchemaRegistryPassword); - - var valueSerializer = SerializationHelper.ResolveValueSerializer(valueType, valueAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); - var keySerializer = SerializationHelper.ResolveValueSerializer(keyType, keyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + var topic = this.config.ResolveSecureSetting(nameResolver, entity.Attribute.Topic); + + (var valueSerializer, var keySerializer) = SerializationHelper.ResolveSerializers( + valueType, + keyType, + valueAvroSchema, + keyAvroSchema, + schemaRegistryUrl, + schemaRegistryUsername, + schemaRegistryPassword, + topic); return (IKafkaProducer)Activator.CreateInstance( typeof(KafkaProducer<,>).MakeGenericType(keyType, valueType), diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs index 82ee42f5..843a8e0c 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/LocalSchemaRegistry.cs @@ -12,25 +12,31 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka /// public class LocalSchemaRegistry : ISchemaRegistryClient { - private readonly string schema; + private readonly string valueSchema; + private readonly string keySchema; + private const int ValueId = 1; + private const int KeyId = 2; + private string valueSubject; + private string keySubject; private List subjects = new List(); - public LocalSchemaRegistry(string schema) + public LocalSchemaRegistry(string valueSchema, string keySchema = null) { - this.schema = schema; + this.valueSchema = valueSchema; + this.keySchema = keySchema; } public int MaxCachedSchemas { get { - return 1; + return 2; } } - public string ConstructKeySubjectName(string topic, string recordType = null) => $"{topic}-key"; + public string ConstructKeySubjectName(string topic, string recordType = null) => keySubject = $"{topic}-key"; - public string ConstructValueSubjectName(string topic, string recordType = null) => topic; + public string ConstructValueSubjectName(string topic, string recordType = null) => valueSubject = $"{topic}-value"; public void Dispose() { @@ -38,7 +44,7 @@ public void Dispose() public Task> GetAllSubjectsAsync() { - return Task.FromResult(this.subjects); + return Task.FromResult(subjects); } public Task GetCompatibilityAsync(string subject = null) @@ -58,13 +64,28 @@ public Task GetRegisteredSchemaAsync(string subject, int versi public Task GetSchemaAsync(string subject, int version) { - return Task.FromResult(this.schema); + if (subject == keySubject) + { + return Task.FromResult(keySchema); + } + else if (subject == valueSubject) + { + return Task.FromResult(valueSchema); + } + return Task.FromResult(null); } public Task GetSchemaAsync(int id, string format = null) { - var schema = new Schema(this.schema, SchemaType.Avro); - return Task.FromResult(schema); + if (id == KeyId) + { + return Task.FromResult(new Schema(keySchema, SchemaType.Avro)); + } + else if (id == ValueId) + { + return Task.FromResult(new Schema(valueSchema, SchemaType.Avro)); + } + return Task.FromResult(null); } public Task GetSchemaIdAsync(string subject, string schema) diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs index 99925e46..ab2c6a82 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Serialization/SerializationHelper.cs @@ -17,25 +17,110 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka { internal static class SerializationHelper { - internal static object ResolveDeserializer(Type type, string specifiedAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword) + internal static (object, object) ResolveDeserializers(GetKeyAndValueTypesResult keyAndValueTypes, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword, string topic) { - if (typeof(IMessage).IsAssignableFrom(type)) + object valueDeserializer = null; + object keyDeserializer = null; + + var valueType = keyAndValueTypes.ValueType; + var keyType = keyAndValueTypes.KeyType; + var specifiedValueAvroSchema = keyAndValueTypes.ValueAvroSchema; + var specifiedKeyAvroSchema = keyAndValueTypes.KeyAvroSchema; + + // call helper if schema registry is specified and return avro deserializers + if (schemaRegistryUrl != null) + { + return ResolveSchemaRegistryDeserializers(valueType, keyType, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword, topic); + } + + // check for protobuf deserialization + if (typeof(IMessage).IsAssignableFrom(valueType)) + { + valueDeserializer = Activator.CreateInstance(typeof(ProtobufDeserializer<>).MakeGenericType(valueType)); + } + if (typeof(IMessage).IsAssignableFrom(keyType)) { - return Activator.CreateInstance(typeof(ProtobufDeserializer<>).MakeGenericType(type)); + keyDeserializer = Activator.CreateInstance(typeof(ProtobufDeserializer<>).MakeGenericType(keyType)); } - var isSpecificRecord = typeof(ISpecificRecord).IsAssignableFrom(type); - if (!isSpecificRecord && !typeof(GenericRecord).IsAssignableFrom(type) && schemaRegistryUrl == null) + var isValueSpecificRecord = typeof(ISpecificRecord).IsAssignableFrom(valueType); + var isValueGenericRecord = typeof(GenericRecord).IsAssignableFrom(valueType); + var isKeySpecificRecord = typeof(ISpecificRecord).IsAssignableFrom(keyType); + var isKeyGenericRecord = typeof(GenericRecord).IsAssignableFrom(keyType); + // create schemas for specific records + if (!string.IsNullOrWhiteSpace(specifiedValueAvroSchema) && isValueSpecificRecord) + { + specifiedValueAvroSchema = ((ISpecificRecord)Activator.CreateInstance(valueType)).Schema.ToString(); + } + if (!string.IsNullOrWhiteSpace(specifiedKeyAvroSchema) && isKeySpecificRecord) { - return null; + specifiedKeyAvroSchema = ((ISpecificRecord)Activator.CreateInstance(keyType)).Schema.ToString(); } - var schemaRegistry = CreateSchemaRegistry(type, specifiedAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword, isSpecificRecord); + // check for avro deserialization + if (specifiedValueAvroSchema != null || specifiedKeyAvroSchema != null) + { + // creates local schema registry if no schema registry url is specified + var schemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + + // if value avro schema exists and value type is generic record, create avro deserializer + if (!string.IsNullOrWhiteSpace(specifiedValueAvroSchema)) + { + if (isValueGenericRecord || isValueSpecificRecord) + { + var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroValueDeserializer), BindingFlags.Static | BindingFlags.NonPublic); + var genericMethod = methodInfo.MakeGenericMethod(valueType); + valueDeserializer = genericMethod.Invoke(null, new object[] { schemaRegistry }); + } + else + { + throw new ArgumentException($"Value type {valueType.FullName} is not a valid data type when avro schema is provided. It must be a GenericRecord or ISpecificRecord."); + } + } + + // if key avro schema exists and key type is a generic record, create avro deserializer + if (!string.IsNullOrWhiteSpace(specifiedKeyAvroSchema)) + { + if (isKeyGenericRecord || isKeySpecificRecord) + { + var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroKeyDeserializer), BindingFlags.Static | BindingFlags.NonPublic); + var genericMethod = methodInfo.MakeGenericMethod(keyType); + keyDeserializer = genericMethod.Invoke(null, new object[] { schemaRegistry }); + } + else + { + throw new ArgumentException($"Key type {keyType.FullName} is not a valid data type when avro schema is provided. It must be a GenericRecord or ISpecificRecord."); + } + } + } + + return (valueDeserializer, keyDeserializer); + } + + internal static (object, object) ResolveSchemaRegistryDeserializers(Type valueType, Type keyType, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword, string topic) + { + object valueDeserializer = null; + object keyDeserializer = null; + + var schemaRegistry = CreateSchemaRegistry(null, null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + + if (typeof(GenericRecord).IsAssignableFrom(valueType)) + { + // retrieve schema and create deserializer + var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroValueDeserializer), BindingFlags.Static | BindingFlags.NonPublic); + var genericMethod = methodInfo.MakeGenericMethod(valueType); + valueDeserializer = genericMethod.Invoke(null, new object[] { schemaRegistry }); + } - var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroValueDeserializer), BindingFlags.Static | BindingFlags.NonPublic); - var genericMethod = methodInfo.MakeGenericMethod(type); + // if keyType is genericRecord, create avro deserializer + if (typeof(GenericRecord).IsAssignableFrom(keyType)) + { + var methodInfo = typeof(SerializationHelper).GetMethod(nameof(CreateAvroKeyDeserializer), BindingFlags.Static | BindingFlags.NonPublic); + var genericMethod = methodInfo.MakeGenericMethod(keyType); + keyDeserializer = genericMethod.Invoke(null, new object[] { schemaRegistry }); + } - return genericMethod.Invoke(null, new object[] { schemaRegistry }); + return (valueDeserializer, keyDeserializer); } private static IDeserializer CreateAvroValueDeserializer(ISchemaRegistryClient schemaRegistry) @@ -48,35 +133,93 @@ private static IDeserializer CreateAvroKeyDeserializer(ISchemaRegist return new AvroDeserializer(schemaRegistry).AsSyncOverAsync(); } - internal static object ResolveValueSerializer(Type valueType, string specifiedAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword) + internal static (object, object) ResolveSerializers(Type valueType, Type keyType, string specifiedValueAvroSchema, string specifiedKeyAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword, string topic) { + object keySerializer = null; + object valueSerializer = null; + + // call helper if schema registry is specified and return avro serializers + if (schemaRegistryUrl != null) + { + return ResolveSchemaRegistrySerializers(valueType, keyType, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword, topic); + } + + // create serializers for protobuf if (typeof(IMessage).IsAssignableFrom(valueType)) { - return Activator.CreateInstance(typeof(ProtobufSerializer<>).MakeGenericType(valueType)); + valueSerializer = Activator.CreateInstance(typeof(ProtobufSerializer<>).MakeGenericType(valueType)); + if (typeof(IMessage).IsAssignableFrom(keyType)) + { + keySerializer = Activator.CreateInstance(typeof(ProtobufSerializer<>).MakeGenericType(keyType)); + } + return (valueSerializer, keySerializer); } - var isSpecificRecord = typeof(ISpecificRecord).IsAssignableFrom(valueType); - if (!isSpecificRecord && !typeof(GenericRecord).IsAssignableFrom(valueType) && schemaRegistryUrl == null) + var isValueSpecificRecord = typeof(ISpecificRecord).IsAssignableFrom(valueType); + var isValueGenericRecord = typeof(GenericRecord).IsAssignableFrom(valueType); + var isKeySpecificRecord = typeof(ISpecificRecord).IsAssignableFrom(keyType); + var isKeyGenericRecord = typeof(GenericRecord).IsAssignableFrom(keyType); + // create schemas for specific records + if (!string.IsNullOrWhiteSpace(specifiedValueAvroSchema) && isValueSpecificRecord) { - return null; + specifiedValueAvroSchema = ((ISpecificRecord)Activator.CreateInstance(valueType)).Schema.ToString(); + } + if (!string.IsNullOrWhiteSpace(specifiedKeyAvroSchema) && isKeySpecificRecord) + { + specifiedKeyAvroSchema = ((ISpecificRecord)Activator.CreateInstance(keyType)).Schema.ToString(); } - var schemaRegistry = CreateSchemaRegistry(valueType, specifiedAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword, isSpecificRecord); + // check for avro serialization + if (specifiedValueAvroSchema != null || specifiedKeyAvroSchema != null) + { + // create schema registry client + var schemaRegistry = CreateSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + object serializer = null; - var serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), schemaRegistry, null /* config */); - return typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(valueType).Invoke(null, new object[] { serializer }); + // create serializers for avro - generic or specific records + if (isValueGenericRecord || isValueSpecificRecord) + { + serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), schemaRegistry, null /* config */); + valueSerializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(valueType).Invoke(null, new object[] { serializer }); + } + if (isKeyGenericRecord || isKeySpecificRecord) + { + serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(keyType), schemaRegistry, null /* config */); + keySerializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(keyType).Invoke(null, new object[] { serializer }); + } + } + + return (valueSerializer, keySerializer); } - private static ISchemaRegistryClient CreateSchemaRegistry(Type valueType, string specifiedAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword, bool isSpecificRecord) + internal static (object, object) ResolveSchemaRegistrySerializers(Type valueType, Type keyType, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword, string topic) { - if (string.IsNullOrWhiteSpace(specifiedAvroSchema) && isSpecificRecord) + object valueSerializer = null; + object keySerializer = null; + + var schemaRegistry = CreateSchemaRegistry(null, null, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); + + if (typeof(GenericRecord).IsAssignableFrom(valueType) || typeof(ISpecificRecord).IsAssignableFrom(valueType)) { - specifiedAvroSchema = ((ISpecificRecord)Activator.CreateInstance(valueType)).Schema.ToString(); + var serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(valueType), schemaRegistry, null /* config */); + valueSerializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(valueType).Invoke(null, new object[] { serializer }); } - if (!string.IsNullOrWhiteSpace(specifiedAvroSchema)) + if (typeof(GenericRecord).IsAssignableFrom(keyType) || typeof(ISpecificRecord).IsAssignableFrom(keyType)) { - return new LocalSchemaRegistry(specifiedAvroSchema); + var serializer = Activator.CreateInstance(typeof(AvroSerializer<>).MakeGenericType(keyType), schemaRegistry, null /* config */); + keySerializer = typeof(SyncOverAsyncSerializerExtensionMethods).GetMethod("AsSyncOverAsync").MakeGenericMethod(keyType).Invoke(null, new object[] { serializer }); + } + + return (valueSerializer, keySerializer); + } + + + private static ISchemaRegistryClient CreateSchemaRegistry(string specifiedValueAvroSchema, string specifiedKeyAvroSchema, string schemaRegistryUrl, string schemaRegistryUsername, string schemaRegistryPassword) + { + if (!string.IsNullOrWhiteSpace(specifiedValueAvroSchema) || !string.IsNullOrWhiteSpace(specifiedKeyAvroSchema)) + { + return new LocalSchemaRegistry(specifiedValueAvroSchema, specifiedKeyAvroSchema); } if (schemaRegistryUrl != null) { @@ -88,7 +231,7 @@ private static ISchemaRegistryClient CreateSchemaRegistry(Type valueType, string } return new CachedSchemaRegistryClient(schemaRegistryConfig.ToArray()); } - throw new ArgumentNullException(nameof(specifiedAvroSchema), $@"parameter is required when creating an generic avro serializer"); + throw new ArgumentNullException(nameof(specifiedValueAvroSchema), $@"parameter is required when creating an generic avro serializer"); } internal class GetKeyAndValueTypesResult @@ -143,6 +286,8 @@ internal static GetKeyAndValueTypesResult GetKeyAndValueTypes(string valueAvroSc } } + // if schema registry is present, the types must be generic too? + (valueType, valueAvroSchema) = GetTypeAndSchema(valueType, valueAvroSchemaFromAttribute); } diff --git a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs index 987b4682..61944bb8 100644 --- a/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs +++ b/src/Microsoft.Azure.WebJobs.Extensions.Kafka/Trigger/KafkaTriggerAttributeBindingProvider.cs @@ -56,9 +56,8 @@ public Task TryCreateAsync(TriggerBindingProviderContext contex var schemaRegistryUrl = this.config.ResolveSecureSetting(nameResolver, attribute.SchemaRegistryUrl); var schemaRegistryUsername = this.config.ResolveSecureSetting(nameResolver, attribute.SchemaRegistryUsername); var schemaRegistryPassword = this.config.ResolveSecureSetting(nameResolver, attribute.SchemaRegistryPassword); - var valueDeserializer = SerializationHelper.ResolveDeserializer(keyAndValueTypes.ValueType, keyAndValueTypes.ValueAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); - var keyDeserializer = SerializationHelper.ResolveDeserializer(keyAndValueTypes.KeyType, keyAndValueTypes.KeyAvroSchema, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword); - + var topic = this.config.ResolveSecureSetting(nameResolver, attribute.Topic); + (var valueDeserializer, var keyDeserializer) = SerializationHelper.ResolveDeserializers(keyAndValueTypes, schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword, topic); var consumerConfig = CreateConsumerConfiguration(attribute); var binding = CreateBindingStrategyFor(keyAndValueTypes.KeyType ?? typeof(Ignore), keyAndValueTypes.ValueType, keyAndValueTypes.RequiresKey, valueDeserializer, keyDeserializer, parameter, consumerConfig); return Task.FromResult(new KafkaTriggerBindingWrapper(binding)); diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/Constants.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/Constants.cs index bc25df2b..956ac650 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/Constants.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/Constants.cs @@ -8,8 +8,10 @@ internal static class Constants internal const string StringTopicWithTenPartitionsName = "stringTopicTenPartitions"; internal const string StringTopicWithLongKeyAndTenPartitionsName = "stringTopicWithLongKeyTenPartitions"; internal const string MyAvroRecordTopicName = "myAvroRecordTopic"; + internal const string MyKeyAvroRecordTopicName = "myKeyAvroRecordTopic"; internal const string MyProtobufTopicName = "myProtobufTopic"; internal const string SchemaRegistryTopicName = "schemaRegistryTopic"; + internal const string SchemaRegistryNoKeyTopicName = "schemaRegistryNoKeyTopic"; internal const string ConsumerGroupID = "e2e_tests"; internal const string SchemaRegistryUrl = "localhost:8081"; } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTestFixture.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTestFixture.cs index bc5729bc..09e777ce 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTestFixture.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTestFixture.cs @@ -30,8 +30,14 @@ public class KafkaEndToEndTestFixture : IAsyncLifetime internal TopicSpecification MyAvroRecordTopic { get; } = new TopicSpecification() { Name = Constants.MyAvroRecordTopicName, NumPartitions = 10, ReplicationFactor = 1 }; + internal TopicSpecification MyKeyAvroRecordTopic { get; } = new TopicSpecification() { Name = Constants.MyKeyAvroRecordTopicName, NumPartitions = 10, ReplicationFactor = 1 }; + internal TopicSpecification MyProtobufTopic { get; } = new TopicSpecification() { Name = Constants.MyProtobufTopicName, NumPartitions = 10, ReplicationFactor = 1 }; + internal TopicSpecification SchemaRegistryTopic { get; } = new TopicSpecification() { Name = Constants.SchemaRegistryTopicName, NumPartitions = 1, ReplicationFactor = 1 }; + + internal TopicSpecification schemaRegistryNoKeyTopic { get; } = new TopicSpecification() { Name = Constants.SchemaRegistryNoKeyTopicName, NumPartitions = 1, ReplicationFactor = 1 }; + public KafkaEndToEndTestFixture() { var config = new ConfigurationBuilder() diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTests.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTests.cs index b9d2baf2..49c06d71 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTests.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaEndToEndTests.cs @@ -1079,7 +1079,116 @@ await TestHelpers.Await(() => await host.StopAsync(); } + } + + [Fact] + public async Task SingleItem_With_AvroKey() + { + const int producedMessagesCount = 5; + var messageMasterPrefix = Guid.NewGuid().ToString(); + + var loggerProvider = CreateTestLoggerProvider(); + + using (var host = await StartHostAsync( + new[] { typeof(SingleItem_GenericAvroValue_With_GenericAvroKey_Trigger), typeof(KafkaOutputFunctions) }, + loggerProvider)) + { + var jobHost = host.GetJobHost(); + + // Call the output function that produces messages with Avro keys + await jobHost.CallAsync( + GetStaticMethod(typeof(KafkaOutputFunctions), + nameof(KafkaOutputFunctions.Produce_AsyncCollector_GenericRecordKeyValue)), + new { + topic = Constants.MyKeyAvroRecordTopicName, + ids = Enumerable.Range(1, producedMessagesCount).Select(x => x), + content = Enumerable.Range(1, producedMessagesCount).Select(x => $"{messageMasterPrefix}-{x}") + }); + + await TestHelpers.Await(() => + { + var foundCount = loggerProvider.GetAllUserLogMessages() + .Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains(messageMasterPrefix)); + return foundCount == producedMessagesCount; + }); + + // Give time for the commit to be saved + await Task.Delay(1500); + + await host.StopAsync(); + } + } + + [Fact] + public async Task SingleItem_With_AvroKey_SchemaRegistry() + { + const int producedMessagesCount = 5; + var messageMasterPrefix = Guid.NewGuid().ToString(); + + var loggerProvider = CreateTestLoggerProvider(); + + using (var host = await StartHostAsync( + new[] { typeof(SingleItem_GenericAvroValue_With_GenericAvroKey_SchemaRegistryURL), typeof(KafkaOutputFunctions) }, + loggerProvider)) + { + var jobHost = host.GetJobHost(); + // Call the output function that produces messages with Avro keys using SchemaRegistry + await jobHost.CallAsync( + GetStaticMethod(typeof(KafkaOutputFunctions), + nameof(KafkaOutputFunctions.Produce_AsyncCollector_GenericRecordKeyValue_With_SchemaRegistry)), + new { + topic = Constants.MyKeyAvroRecordTopicName, + ids = Enumerable.Range(1, producedMessagesCount).Select(x => x), + content = Enumerable.Range(1, producedMessagesCount).Select(x => $"{messageMasterPrefix}-{x}") + }); + + await TestHelpers.Await(() => + { + var foundCount = loggerProvider.GetAllUserLogMessages() + .Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains(messageMasterPrefix)); + return foundCount == producedMessagesCount; + }); + + // Give time for the commit to be saved + await Task.Delay(1500); + + await host.StopAsync(); + } + } + + [Fact] + public async Task SingleItem_With_NoKey_SchemaRegistry() + { + const int producedMessagesCount = 5; + var messageMasterPrefix = Guid.NewGuid().ToString(); + + var loggerProvider = CreateTestLoggerProvider(); + + using (var host = await StartHostAsync( + new[] { typeof(SingleItem_With_Schema_Registry_No_Key), typeof(KafkaOutputFunctions) }, + loggerProvider)) + { + var jobHost = host.GetJobHost(); + + await jobHost.CallAsync( + GetStaticMethod(typeof(KafkaOutputFunctions), + nameof(KafkaOutputFunctions.Produce_AsyncCollector_GenericRecord_NoKey_With_SchemaRegistry)), + new { + topic = Constants.SchemaRegistryNoKeyTopicName, + content = Enumerable.Range(1, producedMessagesCount).Select(x => $"{messageMasterPrefix}-{x}") + }); + + await TestHelpers.Await(() => + { + var foundCount = loggerProvider.GetAllUserLogMessages() + .Count(p => p.FormattedMessage != null && p.FormattedMessage.Contains(messageMasterPrefix)); + return foundCount == producedMessagesCount; + }); + + await Task.Delay(1500); + await host.StopAsync(); + } } } } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaOutputFunctions.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaOutputFunctions.cs index 3a6d52ae..89d1e131 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaOutputFunctions.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/KafkaOutputFunctions.cs @@ -288,5 +288,104 @@ IAsyncCollector> output await output.AddAsync(message); } + + public static async Task Produce_AsyncCollector_GenericRecordKeyValue( + string topic, + IEnumerable ids, + IEnumerable content, + [Kafka(BrokerList = "LocalBroker", AvroSchema = MyAvroRecord.SchemaText, KeyAvroSchema = MyKeyAvroRecord.SchemaText)] IAsyncCollector> output) + { + // Parse the schemas + var keySchema = (RecordSchema)Schema.Parse(MyKeyAvroRecord.SchemaText); + var valueSchema = (RecordSchema)Schema.Parse(MyAvroRecord.SchemaText); + + var idsEnumerator = ids.GetEnumerator(); + foreach (var c in content) + { + idsEnumerator.MoveNext(); + + // Create key GenericRecord + var keyRecord = new GenericRecord(keySchema); + keyRecord.Add("id", idsEnumerator.Current); + keyRecord.Add("type", "message-" + DateTime.UtcNow.Ticks % 100); + + // Create value GenericRecord + var valueRecord = new GenericRecord(valueSchema); + valueRecord.Add("id", c); + valueRecord.Add("ticks", DateTime.UtcNow.Ticks); + + var message = new KafkaEventData() + { + Key = keyRecord, + Topic = topic, + Value = valueRecord, + }; + + await output.AddAsync(message); + } + } + + public static async Task Produce_AsyncCollector_GenericRecordKeyValue_With_SchemaRegistry( + string topic, + IEnumerable ids, + IEnumerable content, + [Kafka(BrokerList = "LocalBroker", AvroSchema = MyAvroRecord.SchemaText, KeyAvroSchema = MyKeyAvroRecord.SchemaText, SchemaRegistryUrl = Constants.SchemaRegistryUrl)] + IAsyncCollector> output) + { + // Parse the schemas + var keySchema = (RecordSchema)Schema.Parse(MyKeyAvroRecord.SchemaText); + var valueSchema = (RecordSchema)Schema.Parse(MyAvroRecord.SchemaText); + + var idsEnumerator = ids.GetEnumerator(); + foreach (var c in content) + { + idsEnumerator.MoveNext(); + + // Create key GenericRecords + var keyRecord = new GenericRecord(keySchema); + keyRecord.Add("id", idsEnumerator.Current); + keyRecord.Add("type", "message-" + DateTime.UtcNow.Ticks % 100); + + // Create value GenericRecord + var valueRecord = new GenericRecord(valueSchema); + valueRecord.Add("id", c); + valueRecord.Add("ticks", DateTime.UtcNow.Ticks); + + var message = new KafkaEventData() + { + Key = keyRecord, + Topic = topic, + Value = valueRecord, + }; + + await output.AddAsync(message); + } + } + + public static async Task Produce_AsyncCollector_GenericRecord_NoKey_With_SchemaRegistry( + string topic, + IEnumerable content, + [Kafka(BrokerList = "LocalBroker", SchemaRegistryUrl = Constants.SchemaRegistryUrl)] + IAsyncCollector> output) + { + // Parse the schema + var valueSchema = (RecordSchema)Schema.Parse(MyAvroRecord.SchemaText); + + foreach (var c in content) + { + // Create value GenericRecord + var valueRecord = new GenericRecord(valueSchema); + valueRecord.Add("id", c); + valueRecord.Add("ticks", DateTime.UtcNow.Ticks); + + var message = new KafkaEventData() + { + Topic = topic, + Value = valueRecord, + }; + + await output.AddAsync(message); + } + } } } \ No newline at end of file diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/Models/MyKeyAvroRecord.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/Models/MyKeyAvroRecord.cs new file mode 100644 index 00000000..5fff84a2 --- /dev/null +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/Models/MyKeyAvroRecord.cs @@ -0,0 +1,57 @@ +// Copyright (c) .NET Foundation. All rights reserved. +// Licensed under the MIT License. See License.txt in the project root for license information. + +using Avro; +using Avro.Specific; +using Newtonsoft.Json; + +namespace Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests +{ + public class MyKeyAvroRecord : ISpecificRecord + { + public const string SchemaText = @" + { + ""type"": ""record"", + ""name"": ""MyKeyAvroRecord"", + ""namespace"": ""Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests"", + ""fields"": [ + { + ""name"": ""id"", + ""type"": ""int"" + }, + { + ""name"": ""type"", + ""type"": ""string"" + } + ] + }"; + public static Schema _SCHEMA = Schema.Parse(SchemaText); + + [JsonIgnore] + public virtual Schema Schema => _SCHEMA; + public string ID { get; set; } + public string Type { get; set; } + + public virtual object Get(int fieldPos) + { + switch (fieldPos) + { + case 0: return this.ID; + case 1: return this.Type; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Get()"); + } + ; + } + public virtual void Put(int fieldPos, object fieldValue) + { + switch (fieldPos) + { + case 0: this.ID = (string)fieldValue; break; + case 1: this.Type = (string)fieldValue; break; + default: throw new AvroRuntimeException("Bad index " + fieldPos + " in Put()"); + } + ; + } + + } +} \ No newline at end of file diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs index f22487e4..1299b2c5 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/TriggerFunctions.cs @@ -4,6 +4,7 @@ using System; using System.Diagnostics; using System.Linq; +using System.Reflection.Emit; using System.Text; using Avro.Generic; using Confluent.Kafka; @@ -329,4 +330,57 @@ public static void Trigger( log.LogInformation(kafkaEvent.Value.ToString()); } } + + internal static class SingleItem_With_Schema_Registry_No_Key + { + public static void Trigger( + [KafkaTrigger("LocalBroker", Constants.SchemaRegistryNoKeyTopicName, ConsumerGroup = Constants.ConsumerGroupID, SchemaRegistryUrl = Constants.SchemaRegistryUrl)] KafkaEventData kafkaEvent, + ILogger log) + { + log.LogInformation(kafkaEvent.Value.ToString()); + } + } + + // Tests for key avro schema + internal static class SingleItem_GenericAvroValue_With_GenericAvroKey_Trigger + { + public static void Trigger( + [KafkaTrigger("LocalBroker", Constants.MyKeyAvroRecordTopicName, ConsumerGroup = Constants.ConsumerGroupID, AvroSchema = MyAvroRecord.SchemaText, KeyAvroSchema = MyKeyAvroRecord.SchemaText)] KafkaEventData kafkaEvent, + ILogger log) + { + var myRecord = kafkaEvent.Value; + var myKey = kafkaEvent.Key; + if (myRecord == null) + { + throw new Exception("MyAvro record is null"); + } + log.LogInformation($"Value: {kafkaEvent.Value.ToString()}"); + if (myKey == null) + { + throw new Exception("MyAvro key is null"); + } + log.BeginScope($"Key: {myKey.ToString()}"); + } + } + + internal static class SingleItem_GenericAvroValue_With_GenericAvroKey_SchemaRegistryURL + { + public static void Trigger( + [KafkaTrigger("LocalBroker", Constants.MyKeyAvroRecordTopicName, ConsumerGroup = Constants.ConsumerGroupID, SchemaRegistryUrl = Constants.SchemaRegistryUrl)] KafkaEventData kafkaEvent, + ILogger log) + { + var myRecord = kafkaEvent.Value; + var myKey = kafkaEvent.Key; + if (myRecord == null) + { + throw new Exception("MyAvro record is null"); + } + log.LogInformation($"Value: {kafkaEvent.Value.ToString()}"); + if (myKey == null) + { + throw new Exception("MyAvro key is null"); + } + log.BeginScope($"Key: {myKey.ToString()}"); + } + } } diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/kafka-singlenode-compose.yaml b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/kafka-singlenode-compose.yaml index 1cad45a6..774fe253 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/kafka-singlenode-compose.yaml +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/kafka-singlenode-compose.yaml @@ -1,12 +1,6 @@ --- version: '2' services: - zookeeper: - image: confluentinc/cp-zookeeper:latest - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - kafka: # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,- # An important note about accessing Kafka from clients on other machines: @@ -26,26 +20,34 @@ services: # "`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,-'"`-._,- # image: confluentinc/cp-kafka:latest - depends_on: - - zookeeper + hostname: kafka ports: - 9092:9092 environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_NODE_ID: 1 + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT + KAFKA_JMX_PORT: 9101 + KAFKA_JMX_HOSTNAME: localhost + KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_LOG_DIRS: /tmp/kraft-combined-logs + KAFKA_KRAFT_CLUSTER_ID: Smqq4VQPTXahsrIWU98xeQ + CLUSTER_ID: Smqq4VQPTXahsrIWU98xeQ + schema-registry: - image: confluentinc/cp-schema-registry + image: confluentinc/cp-schema-registry:latest depends_on: - - zookeeper - kafka ports: - 8081:8081 environment: - SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8081" - SCHEMA_REGISTRY_HOST_NAME: "localhost" - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: zookeeper:2181 - SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:29092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + SCHEMA_REGISTRY_HOST_NAME: localhost + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092 diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/start-kafka-test-environment.sh b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/start-kafka-test-environment.sh index 32966b88..b956cebc 100755 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/start-kafka-test-environment.sh +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.EndToEndTests/start-kafka-test-environment.sh @@ -2,11 +2,158 @@ export COMPOSE_INTERACTIVE_NO_CLI=1 +echo "Installing docker-compose..." sudo apt install -y docker-compose -# start docker compose -docker-compose -f ./kafka-singlenode-compose.yaml up --build -d +echo "Starting Kafka using docker-compose..." +docker-compose -f ./kafka-singlenode-compose.yaml up --build -d +sleep 15 -# wait until kafka is ready to create topic -# need to improve, adding a retry instead of a static sleep -sleep 30 \ No newline at end of file +# Topic name to test +TEST_TOPIC="test-topic" +BOOTSTRAP_SERVER="localhost:9092" + +# Default names for containers +ZOOKEEPER_CONTAINER_NAME="zookeeper" +KAFKA_CONTAINER_NAME="kafka" +SCHEMA_REGISTRY_CONTAINER_NAME="schema-registry" + +# Function to check if all containers are running and start them if not +check_containers() { + echo "Checking if all containers are running..." + docker-compose -f ./kafka-singlenode-compose.yaml ps + # Get the list of services from docker-compose + services=$(docker-compose -f ./kafka-singlenode-compose.yaml config --services) + + # Check each service + all_running=true + for service in $services; do + services_running=$(docker-compose -f ./kafka-singlenode-compose.yaml ps --services --filter "status=running" $service) + + if [ "$service" == "kafka" ]; then + KAFKA_CONTAINER_NAME=$(docker-compose -f ./kafka-singlenode-compose.yaml ps -q $service | xargs docker inspect -f '{{.Name}}' | sed 's/^\///') + echo "Found Kafka container: $KAFKA_CONTAINER_NAME" + elif [ "$service" == "zookeeper" ]; then + ZOOKEEPER_CONTAINER_NAME=$(docker-compose -f ./kafka-singlenode-compose.yaml ps -q $service | xargs docker inspect -f '{{.Name}}' | sed 's/^\///') + echo "Found Zookeeper container: $ZOOKEEPER_CONTAINER_NAME" + elif [ "$service" == "schema-registry" ]; then + SCHEMA_REGISTRY_CONTAINER_NAME=$(docker-compose -f ./kafka-singlenode-compose.yaml ps -q $service | xargs docker inspect -f '{{.Name}}' | sed 's/^\///') + echo "Found Schema Registry container: $SCHEMA_REGISTRY_CONTAINER_NAME" + fi + + if [[ ! " $services_running " =~ (^|[[:space:]])$service($|[[:space:]]) ]]; then + echo "Container for service '$service' is not running." + all_running=false + + # Try to start the individual container + start_container_with_retry "$service" + + if [ $? -eq 0 ]; then + echo "Container for service '$service' is running." + all_running=true + else + echo "Failed to start container for service '$service'." + all_running=false + fi + else + echo "Container for service '$service' is running." + fi + done + + if [ "$all_running" = true ]; then + # Store container names for later use + return 0 + else + echo "All services are not running." + docker-compose -f ./kafka-singlenode-compose.yaml ps + return 1 + fi +} + +# Function to start a container with retry mechanism +start_container_with_retry() { + local service=$1 + local max_attempts=3 + local attempt=1 + local wait_time=15 + + while [ $attempt -le $max_attempts ]; do + echo "Starting '$service' - attempt $attempt of $max_attempts..." + + # Stop the container first if it exists but in a bad state + docker-compose -f ./kafka-singlenode-compose.yaml stop $service 2>/dev/null + + # Start the container + docker-compose -f ./kafka-singlenode-compose.yaml up -d $service + + echo "Waiting $wait_time seconds to let container start..." + sleep $wait_time + + services_running=$(docker-compose -f ./kafka-singlenode-compose.yaml ps --services --filter "status=running" $service) + + if [[ " $services_running " =~ (^|[[:space:]])$service($|[[:space:]]) ]]; then + echo "Successfully started '$service' on attempt $attempt." + return 0 + fi + + echo "Failed to start '$service' on attempt $attempt." + + # Show logs to help diagnose the issue + echo "Container logs for '$service':" + docker-compose -f ./kafka-singlenode-compose.yaml logs --tail=20 $service + + attempt=$((attempt + 1)) + done + + echo "Failed to start '$service' after $max_attempts attempts." + return 1 +} + +# Function to create test topic +create_test_topic() { + local attempts=0 + local max_attempts=5 + local wait_time=10 + + echo "Attempting to create test topic: $TEST_TOPIC" + + while [ $attempts -lt $max_attempts ]; do + echo "Creating topic - attempt $((attempts+1)) of $max_attempts" + + # Try to create the topic and capture output + output=$(docker exec -e LOG_LEVEL=ERROR $KAFKA_CONTAINER_NAME kafka-topics --create --if-not-exists --topic $TEST_TOPIC --bootstrap-server $BOOTSTRAP_SERVER --partitions 1 --replication-factor 1 2>&1) + result=$? + + if [ $result -eq 0 ]; then + echo "Successfully created topic: $TEST_TOPIC" + return 0 + else + attempts=$((attempts+1)) + echo "Failed to create topic (attempt $attempts/$max_attempts):" + echo "$output" + + if [ $attempts -ge $max_attempts ]; then + echo "Maximum number of attempts reached. Cannot create topic." + return 1 + fi + + echo "Waiting $wait_time seconds before retrying..." + sleep $wait_time + fi + done +} + +# Create a test topic if all containers are running +# Fix: Correct syntax for the if statement with check_containers +if ! check_containers; then + echo "Not all containers are running. Exiting." + exit 1 +fi +if ! create_test_topic; then + echo "Failed to create test topic after multiple attempts. Exiting." + exit 1 +fi + +# List topics to confirm +echo "Listing available Kafka topics:" +docker exec -e LOG_LEVEL=ERROR $KAFKA_CONTAINER_NAME kafka-topics --list --bootstrap-server $BOOTSTRAP_SERVER \ No newline at end of file diff --git a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs index 80efd379..9a619ee2 100644 --- a/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs +++ b/test/Microsoft.Azure.WebJobs.Extensions.Kafka.UnitTests/KafkaProducerFactoryTest.cs @@ -70,60 +70,68 @@ public void When_No_Type_Is_Set_Should_Create_ByteArray_Producer() Assert.Null(typedProducer.ValueSerializer); } - [Fact] - public void When_String_Value_Type_Is_Set_Should_Create_String_Listener() + [Theory] + [InlineData(typeof(System.String), typeof(Null), null, null)] + [InlineData(typeof(System.String), typeof(System.String), null, null)] + [InlineData(typeof(GenericRecord), typeof(Null), "fakeValueAvroSchema", null)] + [InlineData(typeof(GenericRecord), typeof(System.String), "fakeValueAvroSchema", null)] + [InlineData(typeof(GenericRecord), typeof(GenericRecord), "fakeAvroSchema", "fakeKeyAvroSchema")] + public void When_Schema_Is_Provided_Should_Create_Type_Listener(Type valueType, Type keyType, string valueAvroSchema, string keyAvroSchema) { var attribute = new KafkaAttribute("brokers:9092", "myTopic") { + AvroSchema = valueAvroSchema, + KeyAvroSchema = keyAvroSchema }; var entity = new KafkaProducerEntity() { Attribute = attribute, - ValueType = typeof(string), + ValueType = valueType, + KeyType = keyType, + ValueAvroSchema = attribute.AvroSchema, + KeyAvroSchema = attribute.KeyAvroSchema }; var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance); var producer = factory.Create(entity); Assert.NotNull(producer); - Assert.IsType>(producer); - var typedProducer = (KafkaProducer)producer; - Assert.Null(typedProducer.ValueSerializer); - } + var expectedType = typeof(KafkaProducer<,>).MakeGenericType(keyType, valueType); + Assert.IsType(expectedType, producer); - [Fact] - public void When_Avro_Schema_Is_Provided_Should_Create_GenericRecord_Listener() - { - var attribute = new KafkaAttribute("brokers:9092", "myTopic") - { - AvroSchema = "fakeAvroSchema" - }; + dynamic typedProducer = Convert.ChangeType(producer, expectedType); + Assert.NotNull(typedProducer); - var entity = new KafkaProducerEntity() + if (valueAvroSchema != null) { - Attribute = attribute, - ValueType = typeof(GenericRecord), - ValueAvroSchema = attribute.AvroSchema, - }; - - var factory = new KafkaProducerFactory(emptyConfiguration, new DefaultNameResolver(emptyConfiguration), NullLoggerFactory.Instance); - var producer = factory.Create(entity); + Assert.NotNull(typedProducer.ValueSerializer); + var valueSerializerType = typeof(SyncOverAsyncSerializer<>).MakeGenericType(valueType); + Assert.IsType(valueSerializerType, typedProducer.ValueSerializer); + } + else + { + Assert.Null(typedProducer.ValueSerializer); + } - Assert.NotNull(producer); - Assert.IsType>(producer); - var typedProducer = (KafkaProducer)producer; - Assert.NotNull(typedProducer.ValueSerializer); - //Assert.IsType>(typedProducer.ValueSerializer); - Assert.IsType>(typedProducer.ValueSerializer); + if (keyAvroSchema != null) + { + Assert.NotNull(typedProducer.KeySerializer); + var keySerializerType = typeof(SyncOverAsyncSerializer<>).MakeGenericType(keyType); + Assert.IsType(keySerializerType, typedProducer.KeySerializer); + } + else + { + Assert.Null(typedProducer.KeySerializer); + } } [Fact] - public void When_Schema_Registry_Is_Provided_Should_Create_GenericRecord_Listener() + public void When_Schema_Registry_Is_Provided_Should_Create_GenericRecord_Producer() { var attribute = new KafkaAttribute("brokers:9092", "myTopic"); - attribute.SchemaRegistryUrl = "localhost"; + attribute.SchemaRegistryUrl = "localhost:8081"; var entity = new KafkaProducerEntity() {